In [8]:
import os
import io
import boto3
import pandas as pd
from tqdm import tqdm
from urllib.parse import urlparse, parse_qs, urlunparse, unquote

S3_BUCKET = "ml-legal-restricted"
EXCEL_KEY = "tabularData/Active Legal Contracts 8-1-2025 10-54-06 AM.xlsx"
OUTPUT_CSV = "contracts_files.csv"

s3 = boto3.client("s3")

COLUMN_MAP = {
    "Contract Number": "contract_number",
    "Account": "account_name",
    "Document Type": "doc_type",
    "Status Reason": "status_reason",
    "Contract Title": "contract_title",
    "Contract Requester": "contract_requester",
    "Reviewing Attorney": "reviewing_attorney",
    "Created On": "created_on",
    "Document Effective Date": "document_effective_date",
    "Parent Contract": "parent_contract",
    "Solution Line": "solution_line",
    "Account Type": "account_type",
    "Document URL": "sharepoint_document_url",
    "Document Title": "document_title",
    "Related Product": "related_product",
}

def normalize_url(u: str) -> str:
    """
    Normalize URLs so that small formatting differences don't break matches.
    - Strip whitespace
    - Lowercase scheme/host
    - Decode % encodings on the path (so %20 -> space)
    - Remove trailing slashes on the path
    """
    if not isinstance(u, str):
        return ""
    u = u.strip()
    if not u:
        return ""
    try:
        p = urlparse(u)
        scheme = (p.scheme or "").lower()
        netloc = (p.netloc or "").lower()
        path = unquote(p.path or "")
        if path.endswith("/") and len(path) > 1:
            path = path.rstrip("/")
        return urlunparse((scheme, netloc, path, p.params, p.query, p.fragment))
    except Exception:
        return u

def fetch_excel_data(bucket: str, key: str, sheet_name="Active Legal Contracts") -> pd.DataFrame:
    """Download and parse Excel file from S3, select & rename desired columns, normalize URL, add file_name."""
    print(f"📥 Downloading Excel: s3://{bucket}/{key}")
    try:
        excel_data = s3.get_object(Bucket=bucket, Key=key)["Body"].read()
    except Exception as e:
        raise RuntimeError(f"Failed to download Excel from s3://{bucket}/{key}: {e}")

    df = pd.read_excel(io.BytesIO(excel_data), sheet_name=sheet_name, engine="openpyxl")
    missing_cols = [col for col in COLUMN_MAP.keys() if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing columns in Excel: {missing_cols}")
    
    df = df[list(COLUMN_MAP.keys())].dropna(subset=["Contract Number"])
    df["Contract Number"] = df["Contract Number"].astype(str).str.strip()

    df = df.rename(columns=COLUMN_MAP)

    df["sharepoint_document_url"] = (
        df["sharepoint_document_url"].fillna("").astype(str).apply(normalize_url)
    )

    df = df[df["sharepoint_document_url"].str.strip() != ""]

    def _filename_from_url(u: str) -> str:
        try:
            p = urlparse(u)
 
            qs = parse_qs(p.query)
            if "file" in qs and qs["file"]:
                return unquote(qs["file"][0])
            path = unquote(p.path or "")
            return os.path.basename(path)
        except Exception:
            return ""

    df["file_name"] = df["sharepoint_document_url"].apply(_filename_from_url)

    return df

def find_matching_contract_file(
    bucket: str,
    contract_number: str,
    target_filename: str,
    prefix_base="contract-docs/",
) -> str | None:
    """
    Search under contract-docs/{contract_number}/ and return the *single* key
    that exactly matches the target file name. If not found, return None.
    """
    if not target_filename:
        return None

    prefix = f"{prefix_base}{contract_number}/"
    paginator = s3.get_paginator("list_objects_v2")

    try:
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                key = obj.get("Key", "")
                if not key or key.endswith("/"):
                    continue
                if os.path.basename(key) == target_filename:
                    return key
    except Exception as e:
        print(f"⚠️  Error listing S3 for prefix {prefix}: {e}")

    return None

def process_contracts_with_metadata(bucket: str, excel_key: str, output_csv: str):
    try:
        df = fetch_excel_data(bucket, excel_key)
    except Exception as e:
        print(f"❌ Failed to read Excel: {e}")
        return

    output_rows = []
    not_found_rows = []

    print(f"\n🔍 Processing {len(df)} contracts...")
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing"):
        try:
            contract_number = str(row.get("contract_number", "")).strip()
            file_name = str(row.get("file_name", "")).strip()

            key = find_matching_contract_file(bucket, contract_number, file_name)

            if not key and contract_number.isdigit() and len(contract_number) < 8:
                padded = contract_number.zfill(8)
                key = find_matching_contract_file(bucket, padded, file_name)

            if key:
                full_path = f"s3://{bucket}/{key}"
                entry = {"contract_number": contract_number, "s3_full_path": full_path}
                for csv_col in COLUMN_MAP.values():
                    entry[csv_col] = row.get(csv_col, "")
                entry["file_name"] = file_name
                output_rows.append(entry)
            else:
                not_found_rows.append(
                    {
                        "contract_number": contract_number,
                        "file_name": file_name,
                        "sharepoint_document_url": row.get("sharepoint_document_url", ""),
                    }
                )
        except Exception as e:
            print(f"⚠️  Error processing contract '{row.get('contract_number', '')}': {e}")

    output_df = pd.DataFrame(output_rows)

    ordered_cols = (
        ["contract_number", "s3_full_path"]
        + list(COLUMN_MAP.values())
        + ["file_name"]
    )

    seen = set()
    ordered_cols = [c for c in ordered_cols if not (c in seen or seen.add(c))]
    if not output_df.empty:
        output_df = output_df.reindex(columns=ordered_cols, fill_value="")

    try:
        output_df.to_csv(output_csv, index=False)
    except Exception as e:
        print(f"❌ Failed to write output CSV '{output_csv}': {e}")
        return

    print(f"\n✅ Contracts with matching files saved to: {output_csv}")
    print(f"Total contracts with matches: {output_df['contract_number'].nunique() if not output_df.empty else 0}")
    print(f"Total matched file rows: {len(output_df)}")

    if not_found_rows:
        missing_df = pd.DataFrame(not_found_rows)
        missing_df.to_csv('missing_files.csv', index=False)
        print(f"\n❗ Could not find matching S3 files for {len(missing_df)} contracts.")
        print(f"Details saved to: {'missing_files.csv'}")

if __name__ == "__main__":
    process_contracts_with_metadata(S3_BUCKET, EXCEL_KEY, OUTPUT_CSV)

📥 Downloading Excel: s3://ml-legal-restricted/tabularData/Active Legal Contracts 8-1-2025 10-54-06 AM.xlsx


  warn(msg)



🔍 Processing 14644 contracts...


Processing: 100%|██████████| 14644/14644 [06:47<00:00, 35.90it/s]



✅ Contracts with matching files saved to: contracts_files.csv
Total contracts with matches: 12676
Total matched file rows: 12676

❗ Could not find matching S3 files for 1968 contracts.
Details saved to: missing_files.csv


In [1]:
import os
import json
import requests
import numpy as np

EMBEDINNGS_URL = "https://zgggzg2iqg.execute-api.us-east-1.amazonaws.com/dev/get_embeddings"
API_KEY = "2jIpWCyNRg3Y8lkbmWG0tkyXwYlJn5QaZ1F3yKf7"

def _extract_embeddings_obj(obj):

    if isinstance(obj, dict) and "embeddings" in obj:
        return obj["embeddings"]

    if isinstance(obj, dict) and "body" in obj:
        try:
            body = obj["body"]
            if isinstance(body, str):
                inner = json.loads(body)
            else:
                inner = body
            if isinstance(inner, dict) and "embeddings" in inner:
                return inner["embeddings"]
        except Exception:
            pass

    raise KeyError("No 'embeddings' found in response object")

def get_text_embedding(texts, model='e5_mistral_embed_384', timeout=8):
    if isinstance(texts, str):
        texts = [texts]
    if not isinstance(texts, list) or not texts:
        raise ValueError("Input 'texts' must be a non-empty list of strings.")

    headers = {
        "x-api-key": API_KEY,
        "Content-Type": "application/json"
    }

    out = []
    for text in texts:
        if not isinstance(text, str):
            raise ValueError("Each item in 'texts' must be a string.")

        payload = {"model_name": model, "texts": [text]}

        try:
            resp = requests.post(EMBEDINNGS_URL, json=payload, headers=headers, timeout=timeout)
            resp.raise_for_status()

            obj = resp.json()
            embeddings = _extract_embeddings_obj(obj)

            if (not isinstance(embeddings, list)) or len(embeddings) != 1 or (not isinstance(embeddings[0], list)):
                raise KeyError("Response did not contain a single embedding vector")

            vec = np.array(embeddings[0], dtype=np.float32).tolist()
            out.append(vec)

        except Exception as e:
            print(f"[ERROR] Failed to get embedding for '{text}': {e}")
            try:
                print(f"[DEBUG] HTTP {resp.status_code} body: {resp.text[:500]}")
            except Exception:
                pass
            out.append(None)

    return out[0] if len(out) == 1 else out

In [26]:
import os
import uuid
import pandas as pd
from functools import lru_cache
from typing import Dict, List
import copy
import math


def _ensure_nltk():
    try:
        import nltk
        try:
            nltk.data.find("tokenizers/punkt")
        except LookupError:
            nltk.download("punkt")
        try:
            nltk.data.find("tokenizers/punkt_tab")
        except LookupError:
            try:
                nltk.download("punkt_tab")
            except Exception:
                pass
        return nltk
    except Exception:
        return None

def _sent_tokenize(text: str) -> List[str]:
    nltk = _ensure_nltk()
    if nltk is not None:
        from nltk.tokenize import sent_tokenize
        return sent_tokenize(text)

@lru_cache(maxsize=1)
def get_tokenizer(tokenizer_name: str = "intfloat/e5-small-v2"):
    from transformers import AutoTokenizer
    return AutoTokenizer.from_pretrained(tokenizer_name)

EXCLUDE_KEYS = {"file_name", "contract_number", "opensearch", "s3_vectors"}
OLD_KEYS_TO_PRUNE = {
    "created_on", "document_effective_date", "contract_requester",
    "reviewing_attorney", "client_account", "parent_contract",
    "account_type", "related_product", "sharepoint_document_url", "document_title"
}

def _sanitize_value(v):
    import math
    if v is None:
        return "None"
    if isinstance(v, float):
        if math.isnan(v):
            return "None"
        if v.is_integer():
            return str(int(v))
        else:
            return str(v)
    if isinstance(v, (int, str)):
        return str(v)
    return str(v)


def _shape_metadata(meta: Dict) -> Dict:

    for k in list(meta.keys()):
        if k in EXCLUDE_KEYS:
            del meta[k]

    created_on = meta.get("created_on", "None")
    document_effective = meta.get("document_effective_date", "None")
    requester = meta.get("contract_requester", "None")
    reviewer = meta.get("reviewing_attorney", "None")
    account_name = meta.get("account_name", "None")
    parent_contract = meta.get("parent_contract", "None")
    account_type = meta.get("account_type", "None")
    related_product = meta.get("related_product", "None")
    document_title = meta.get("document_title", "None")
    solution_line = meta.get("solution_line", "None")

    meta['solution_line'] = solution_line
    meta["dates"] = [d for d in [created_on, document_effective, document_title]]
    meta["attorneys"] = [a for a in [requester, reviewer]]
    meta["account_details"] = [a for a in [account_name, parent_contract, account_type, related_product]]

    for k in list(meta.keys()):
        if k in OLD_KEYS_TO_PRUNE:
            del meta[k]
    
    for k, v in meta.items():
        if isinstance(v, list):
            meta[k] = [_sanitize_value(x) for x in v]
        else:
            meta[k] = _sanitize_value(v)

    return meta

def chunk_text_and_build_chunks(
    text: str,
    meta: Dict,
    token_limit: int = 400,
    safety_margin: int = 16,
    tokenizer_name: str = "intfloat/e5-small-v2"
) -> List[Dict]:

    if not text or not isinstance(text, str):
        return []

    tokenizer = get_tokenizer(tokenizer_name)
    model_max = getattr(tokenizer, "model_max_length", 512)
    if model_max is None or model_max > 4096:
        model_max = 512
    hard_cap = max(16, min(token_limit, model_max - safety_margin))
    sents = _sent_tokenize(text)

    chunks_text: List[str] = []
    current_sents: List[str] = []
    tok_count = 0

    for sent in sents:
        sent_ids = tokenizer.encode(sent, add_special_tokens=False)
        tlen = len(sent_ids)

        # Flush current chunk if adding sentence exceeds limit
        if current_sents and tok_count + tlen > hard_cap:
            candidate_chunk = " ".join(current_sents)
            # Double check token length does NOT exceed hard_cap
            candidate_ids = tokenizer.encode(candidate_chunk, add_special_tokens=False)
            if len(candidate_ids) > hard_cap:
                # If too long, maybe chunk sentence by sentence or split differently
                pass
            else:
                chunks_text.append(candidate_chunk)
                current_sents = []
                tok_count = 0

        # If single sentence too long, split by token windows
        if tlen > hard_cap:
            if current_sents:
                chunks_text.append(" ".join(current_sents))
                current_sents = []
                tok_count = 0

            start = 0
            while start < tlen:
                end = min(start + hard_cap, tlen)
                seg_ids = sent_ids[start:end]
                seg_text = tokenizer.decode(seg_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True)

                if not seg_text.strip():
                    # Fallback: slice raw text chars approx
                    seg_text = sent[start:start+1000]
                    if not seg_text:
                        break

                chunks_text.append(seg_text)
                start += hard_cap

            continue

        # Normal case: add sentence to current chunk
        current_sents.append(sent)
        tok_count += tlen

    if current_sents:
        chunks_text.append(" ".join(current_sents))

    # Build final chunk objects, copy metadata to avoid mutation issues
    out: List[Dict] = []
    for ch in chunks_text:
        meta_copy = copy.deepcopy(meta)
        meta_copy["text"] = ch
        shaped = _shape_metadata(meta_copy)
        out.append({
            "key": str(uuid.uuid4()),
            "metadata": shaped
        })

    return out


In [None]:
import os
import io
import tempfile
import pandas as pd
from typing import List, Dict, Tuple
import textract
import extract_msg
import pytesseract
from pdf2image import convert_from_bytes
from PyPDF2 import PdfReader
import boto3

s3 = boto3.client("s3")
s3v = boto3.client("s3vectors", region_name="us-east-1")

SUPPORTED_EXTENSIONS = {".pdf", ".docx", ".txt", ".msg", ".doc"}

def parse_s3_uri(s3_uri: str) -> Tuple[str, str]:

    if not s3_uri.startswith("s3://"):
        raise ValueError(f"Invalid S3 URI: {s3_uri}")
    parts = s3_uri[5:].split("/", 1)
    if len(parts) != 2:
        raise ValueError(f"Invalid S3 URI: {s3_uri}")
    return parts[0], parts[1]

def download_s3_file(bucket: str, key: str) -> io.BytesIO:
    resp = s3.get_object(Bucket=bucket, Key=key)
    return io.BytesIO(resp["Body"].read())


def extract_from_doc(file_io: io.BytesIO) -> str:
    try:
        file_io.seek(0)
        with tempfile.NamedTemporaryFile(delete=False, suffix=".doc") as tmp:
            tmp.write(file_io.read())
            tmp_path = tmp.name
        text = textract.process(tmp_path).decode("utf-8", errors="ignore").strip()
        os.remove(tmp_path)
        return text
    except Exception as e:
        print(f"❗ DOC (textract) error: {e}")
        return ""

def extract_with_ocr(file_io: io.BytesIO) -> str:
    try:
        file_io.seek(0)
        images = convert_from_bytes(file_io.read())
        out = []
        for img in images:
            out.append(pytesseract.image_to_string(img))
        return "\n".join(out).strip()
    except Exception as e:
        print(f"❗ OCR failed: {e}")
        return ""

def extract_from_pdf(file_io: io.BytesIO) -> str:
    try:
        file_io.seek(0)
        reader = PdfReader(file_io)
        texts = []
        for p in reader.pages:
            t = p.extract_text() or ""
            if t.strip():
                texts.append(t)
        return "\n".join(texts).strip()
    except Exception as e:
        print(f"❗ PDF read error: {e}")
        return ""

def extract_from_docx(file_io: io.BytesIO) -> str:
    try:
        from docx import Document
        from io import BytesIO
        file_io.seek(0)
        bio = BytesIO(file_io.read())
        doc = Document(bio)
        return "\n".join(p.text for p in doc.paragraphs).strip()
    except Exception as e:
        print(f"❗ DOCX error: {e}")
        return ""

def extract_from_txt(file_io: io.BytesIO) -> str:
    try:
        file_io.seek(0)
        return file_io.read().decode("utf-8", errors="ignore").strip()
    except Exception as e:
        print(f"❗ TXT read error: {e}")
        return ""

def extract_from_msg(file_io: io.BytesIO) -> str:
    try:
        file_io.seek(0)
        with tempfile.NamedTemporaryFile(delete=False, suffix=".msg") as tmp:
            tmp.write(file_io.read())
            tmp_path = tmp.name
        msg = extract_msg.Message(tmp_path)
        text = (msg.body or "").strip()
        os.remove(tmp_path)
        return text
    except Exception as e:
        print(f"❗ MSG read error: {e}")
        return ""

def extract_text(file_io: io.BytesIO, ext: str) -> Tuple[str, str]:

    ext = ext.lower()
    extractors = {
        ".pdf": [extract_from_pdf, extract_with_ocr],
        ".docx": [extract_from_docx, extract_with_ocr],
        ".doc": [extract_from_doc],
        ".txt": [extract_from_txt],
        ".msg": [extract_from_msg],
    }
    for extractor in extractors.get(ext, []):
        file_io.seek(0)
        text = extractor(file_io)
        if text:
            return text, extractor.__name__
    return "", "none"

def batched(iterable, n):
    batch = []
    for x in iterable:
        batch.append(x)
        if len(batch) == n:
            yield batch
            batch = []
    if batch:
        yield batch

def upload_chunks_to_s3_vector_index(
    chunks: List[Dict],
    batch_size: int = 500
):
    texts = [c["metadata"]["text"] for c in chunks]
    keys = [c["key"] for c in chunks]

    embeddings = get_text_embedding(texts)

    if embeddings and isinstance(embeddings[0], float):
        embeddings = [embeddings]

    vectors = []
    for i, c in enumerate(chunks):
        emb = embeddings[i] if i < len(embeddings) else None
        if emb is None:
            continue
        vectors.append({
            "key": keys[i],
            "data": {"float32": emb},
            "metadata": c["metadata"]
        })


    responses = []
    for batch in batched(vectors, batch_size):
        resp = s3v.put_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=INDEX_NAME,
            vectors=batch
        )
        responses.append(resp)
    return responses

def mark_csv_file_processed(csv_path: str, s3_full_path: str) -> bool:
    try:
        df = pd.read_csv(csv_path)
        if "s3_full_path" not in df.columns or "s3_vectors" not in df.columns:
            print(f"❗ Missing 's3_full_path' or 's3_vectors' in {csv_path}")
            return False

        idx = df.index[df["s3_full_path"] == s3_full_path]
        if len(idx) == 0:
            print(f"ℹ️ Row not found in CSV for {s3_full_path}")
            return False

        df.loc[idx, "s3_vectors"] = True
        df.to_csv(csv_path, index=False)
        return True
    except Exception as e:
        print(f"❗ mark_csv_file_processed error: {e}")
        return False

def build_items_from_csv(csv_path: str) -> List[Dict]:

    df = pd.read_csv(csv_path)
    if "s3_vectors" not in df.columns or "s3_full_path" not in df.columns:
        raise ValueError("CSV must contain 's3_full_path' and 's3_vectors'")

    df["s3_vectors"] = df["s3_vectors"].astype(str).str.lower()
    df = df[df["s3_vectors"] != "true"].copy()

    items = []
    for _, row in df.iterrows():
        s3_path = row["s3_full_path"]
        try:
            b, k = parse_s3_uri(s3_path)
        except ValueError as ve:
            print(f"❗ Skipping invalid s3 path ({s3_path}): {ve}")
            continue

        meta = row.to_dict()
        # meta["s3_path"] = s3_path
        items.append({"bucket": b, "file_key": k, "metadata": meta})
    return items, df

def process_documents(
    items: List[Dict],
    csv_path_to_mark: str
):
    stats = {
        "processed": 0, "failed": 0,
        "pdf": 0, "docx": 0, "txt": 0, "msg": 0, "doc": 0,
        "ocr": 0, "none": 0, "unsupported_ext": 0
    }

    for idx, item in enumerate(items, 1):
        bucket = item["bucket"]
        key = item["file_key"]
        meta = dict(item.get("metadata", {}))
        s3_path = meta.get("s3_full_path")
        # s3_path = meta.get("s3_path", f"s3://{bucket}/{key}")
        ext = os.path.splitext(key)[1].lower()

        print(f"📄 [{idx}/{len(items)}] {key}")

        if ext not in SUPPORTED_EXTENSIONS:
            print(f"⚠️ Unsupported extension: {ext} ({s3_path})")
            stats["unsupported_ext"] += 1
            stats["failed"] += 1
            continue

        try:
            file_io = download_s3_file(bucket, key)
            text, method = extract_text(file_io, ext)

            if not text:
                print("⚠️ No text extracted.")
                stats["failed"] += 1
                stats["none"] += 1
                continue

            # Chunk
            chunks = chunk_text_and_build_chunks(text, meta)

            responses = upload_chunks_to_s3_vector_index(chunks=chunks)

            if responses:
                if csv_path_to_mark:
                    ok = mark_csv_file_processed(csv_path_to_mark, s3_path)
                    if ok:
                        print(f"✅ Uploaded {len(chunks)} vectors and marked in CSV")
                    else:
                        print("ℹ️ Uploaded vectors, but could not mark row in CSV")
            else:
                print("❗ Upload returned no responses")

            stats["processed"] += 1
            stats[ext.replace(".", "")] += 1
            if "ocr" in method:
                stats["ocr"] += 1

        except Exception as e:
            print(f"❗ Error processing {s3_path}: {e}")
            stats["failed"] += 1
            stats["none"] += 1

    # Summary
    print("\n=== Ingestion Summary ===")
    first = [f"Processed: {stats['processed']}", f"Failed: {stats['failed']}"]
    rest = [f"{k.upper()}: {v}" for k, v in stats.items() if k not in {"processed", "failed"}]
    print("  ".join(first))
    print("  ".join(rest))
    return stats

def run_ingestion(work_csv: str):

    if s3v is None or VECTOR_BUCKET_NAME is None or INDEX_NAME is None:
        raise ValueError("Please provide s3v, VECTOR_BUCKET_NAME, INDEX_NAME")

    items, _df_work = build_items_from_csv(work_csv)

    if len(items) == 0:
        print("Nothing to process. All rows are already marked or CSV is empty.")
        return {"processed": 0, "failed": 0}

    # Go!
    return process_documents(
        items=items,
        csv_path_to_mark=work_csv
    )


VECTOR_BUCKET_NAME = "legal-docs-vector-store"
INDEX_NAME = 'token-chunking-new-files'

run_ingestion(work_csv="gathered_contract_files_valid_new.csv")
