In [None]:

import os, time, json, ast, re, uuid, random
from typing import List, Dict, Any
import numpy as np
import pandas as pd
from tqdm import tqdm

# Qdrant imports
try:
    from qdrant_client import QdrantClient
    from qdrant_client.http.models import Distance, VectorParams, PayloadSchemaType, Filter, FieldCondition, Range
except Exception:
    from qdrant_client import QdrantClient
    from qdrant_client.models import Distance, VectorParams, PayloadSchemaType, Filter, FieldCondition, Range  # fallback

# -------------------- USER CONFIG --------------------
CSV_PATH = "/content/drive/MyDrive/DATA/fda_maude_anomaly_ready.csv"
COLLECTION_NAME = "fda_maude_rag"
QDRANT_URL = "https://20851a9b-65fb-47d0-982e-38fdfc7d76f8.europe-west3-0.gcp.cloud.qdrant.io"
QDRANT_API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhY2Nlc3MiOiJtIn0.ooylH9ScdhcxPm0MywCTUSBDULcNCuuL4U8wd52UwXY"                                          # <-- required

ID_NAMESPACE = uuid.UUID("8c6b1d0d-7a8c-4c50-bc2e-7e4d5f6a1b23")

ID_COLS_FOR_UUID = [
    "mdr_report_key", "device_sequence_no", "report_number",
    "date_received", "model_number", "row_in_group"
]

EMBED_COL = "foi_embedding"
VECTOR_NAME = "foi_embedding"
UPSERT_BATCH = 1000
VERIFY_BATCH = 1000
RETRY_TIMES = 3
RETRY_BACKOFF = 2.0
FILL_MISSING_EMBEDDINGS = True

# Columns to upload (minimal payload + derived date fields)
PAYLOAD_COLS = [
    "mdr_report_key", "report_number",
    "date_received",
    "year", "date_int",
    "brand_name", "generic_name", "manufacturer_d_name",
    "event_type", "adverse_event_flag", "product_problem_flag",
    "model_number",
    "foi_text",
    "anomaly_score", "reconstruction_error", "ae_anomaly", "anomaly_flag",

]
# -----------------------------------------------------

client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
print("Connected to Qdrant:", QDRANT_URL)


Connected to Qdrant: https://20851a9b-65fb-47d0-982e-38fdfc7d76f8.europe-west3-0.gcp.cloud.qdrant.io


In [None]:

def parse_embedding(val):
    """Robustly parse embedding: JSON list, python list, array(...), comma/semicolon/space separated."""
    if val is None:
        return None
    if isinstance(val, (list, tuple, np.ndarray)):
        try:
            return [float(x) for x in val]
        except Exception:
            return None
    s = str(val).strip()
    if not s or s.lower() in {"none", "nan"}:
        return None
    if s.startswith("array(") and s.endswith(")"):
        s = s[len("array("):-1].strip()

    if s.startswith("[") and s.endswith("]"):
        try:
            arr = json.loads(s)
            if isinstance(arr, list):
                return [float(x) for x in arr]
        except Exception:
            try:
                arr = ast.literal_eval(s)
                if isinstance(arr, (list, tuple)):
                    return [float(x) for x in arr]
            except Exception:
                pass

    if ";" in s:
        try:
            return [float(x) for x in s.split(";") if x.strip()]
        except Exception:
            pass

    s2 = s.replace("\n", " ").replace("\r", " ").replace("\t", " ").strip("[]")
    if "," in s2:
        try:
            return [float(x) for x in s2.split(",") if x.strip()]
        except Exception:
            pass

    parts = re.split(r"\s+", s2)
    try:
        vec = [float(x) for x in parts if x]
        return vec if len(vec) > 1 else None
    except Exception:
        return None

def coerce_payload(d: Dict[str, Any]) -> Dict[str, Any]:
    """Replace NaN with None; cast numpy types to Python primitives."""
    out = {}
    for k, v in d.items():
        if isinstance(v, float) and (np.isnan(v) or np.isinf(v)):
            out[k] = None
        elif pd.isna(v):
            out[k] = None
        elif isinstance(v, (np.integer,)):
            out[k] = int(v)
        elif isinstance(v, (np.floating,)):
            out[k] = float(v)
        else:
            out[k] = v
    return out

def chunker(seq, size):
    for i in range(0, len(seq), size):
        yield seq[i:i+size]

def upsert_batch(points):
    last_err = None
    for attempt in range(1, RETRY_TIMES + 1):
        try:
            client.upsert(collection_name=COLLECTION_NAME, points=points, wait=True)
            return
        except Exception as e:
            last_err = e
            sleep_s = RETRY_BACKOFF ** attempt
            print(f"Upsert retry {attempt}/{RETRY_TIMES} after error: {e.__class__.__name__}: {e}. Sleeping {sleep_s:.1f}s...")
            time.sleep(sleep_s)
    raise last_err


In [None]:

print("Loading CSV:", CSV_PATH)
df = pd.read_csv(CSV_PATH, low_memory=False)
print("Rows:", len(df), " Columns:", len(df.columns))

# --- Standardize date field ---
if "date_received" in df.columns:
    df["date_received_std"] = df["date_received"]
elif "date_recevied" in df.columns:
    df["date_received_std"] = df["date_recevied"]
else:
    raise ValueError("Neither 'date_received' nor 'date_recevied' found in CSV.")


dr = pd.to_datetime(df["date_received_std"], errors="coerce")
df["year"] = dr.dt.year.astype("Int64")
df["date_int"] = pd.to_numeric(dr.dt.strftime("%Y%m%d"), errors="coerce").astype("Int64")


df["date_received"] = df["date_received_std"]


sort_cols = [c for c in ["mdr_report_key", "device_sequence_no", "date_received", "report_number", "model_number"] if c in df.columns]
df = df.sort_values(sort_cols, kind="mergesort").reset_index(drop=True)


grp_cols = [c for c in ["mdr_report_key", "device_sequence_no"] if c in df.columns]
if len(grp_cols) < 1:

    grp_cols = ["mdr_report_key"]
df["row_in_group"] = df.groupby(grp_cols, dropna=False).cumcount()


def make_row_uid(row) -> str:
    parts = []
    for k in ID_COLS_FOR_UUID:

        val = row[k] if k in row and pd.notna(row[k]) else "<NA>"
        parts.append(str(val))
    key = "|".join(parts)
    return str(uuid.uuid5(ID_NAMESPACE, key))


for k in ID_COLS_FOR_UUID:
    if k not in df.columns:
        df[k] = "<NA>"
df["point_id"] = df.apply(make_row_uid, axis=1)
assert df["point_id"].nunique() == len(df), "point_id not unique — check ID_COLS_FOR_UUID / row_in_group logic."

print("✅ Deterministic UUIDs generated for all rows.")


Loading CSV: /content/drive/MyDrive/DATA/fda_maude_anomaly_ready.csv
Rows: 462682  Columns: 50
✅ Deterministic UUIDs generated for all rows.


In [None]:


keep_cols = sorted(set(PAYLOAD_COLS + [EMBED_COL, "point_id"]))
missing_cols = [c for c in keep_cols if c not in df.columns]
if missing_cols:
    raise ValueError(f"Missing required columns for upload: {missing_cols}")
df_up = df[keep_cols].copy()


vec_size = None
for v in df_up[EMBED_COL]:
    parsed = parse_embedding(v)
    if parsed:
        vec_size = len(parsed)
        break
if vec_size is None:
    raise ValueError("Could not parse any embeddings. Check EMBED_COL content/format.")
print("Detected embedding size:", vec_size)

default_vec = [0.0] * vec_size if FILL_MISSING_EMBEDDINGS else None


Detected embedding size: 384


In [None]:


existing = [c.name for c in client.get_collections().collections]
if COLLECTION_NAME not in existing:
    print("Creating collection:", COLLECTION_NAME)
    client.create_collection(
        collection_name=COLLECTION_NAME,
        vectors_config={VECTOR_NAME: VectorParams(size=vec_size, distance=Distance.COSINE)}
    )
else:
    print("Using existing collection:", COLLECTION_NAME)


INDEX_FIELDS = [
    ("brand_name", PayloadSchemaType.KEYWORD),
    ("generic_name", PayloadSchemaType.KEYWORD),
    ("manufacturer_d_name", PayloadSchemaType.KEYWORD),
    ("event_type", PayloadSchemaType.KEYWORD),
    ("adverse_event_flag", PayloadSchemaType.KEYWORD),
    ("product_problem_flag", PayloadSchemaType.KEYWORD),
    ("model_number", PayloadSchemaType.KEYWORD),
    ("year", PayloadSchemaType.INTEGER),
    ("date_int", PayloadSchemaType.INTEGER),
]
for field, schema in INDEX_FIELDS:
    try:
        try:
            client.create_payload_index(COLLECTION_NAME, field, schema)
        except TypeError:
            client.create_payload_index(collection_name=COLLECTION_NAME, field_name=field, field_schema=schema)
        print(f"Index ensured on '{field}'.")
    except Exception:
        pass


Creating collection: fda_maude_rag
Index ensured on 'brand_name'.
Index ensured on 'generic_name'.
Index ensured on 'manufacturer_d_name'.
Index ensured on 'event_type'.
Index ensured on 'adverse_event_flag'.
Index ensured on 'product_problem_flag'.
Index ensured on 'model_number'.
Index ensured on 'year'.
Index ensured on 'date_int'.


In [None]:

def build_point(row: pd.Series):
    vec = parse_embedding(row[EMBED_COL])
    if vec is None:
        if default_vec is None:
            return None
        vec = default_vec
    payload = {c: row.get(c, None) for c in PAYLOAD_COLS if c in row}
    payload = coerce_payload(payload)
    return {
        "id": row["point_id"],            # UUIDv5 string id
        "vector": {VECTOR_NAME: vec},
        "payload": payload,
    }

print("Starting upload...")
skipped = 0
total_rows = len(df_up)
pbar = tqdm(total=total_rows, desc="Upserting", unit="rows")

for idx_chunk in chunker(list(range(total_rows)), UPSERT_BATCH):
    points = []
    for i in idx_chunk:
        pt = build_point(df_up.iloc[i])
        if pt is None:
            skipped += 1
            continue
        points.append(pt)
    if points:
        upsert_batch(points)
    pbar.update(len(idx_chunk))

pbar.close()
print(f"Upload complete. Skipped (missing embeddings without fill): {skipped}")


Starting upload...


Upserting: 100%|██████████| 462682/462682 [24:20<00:00, 316.72rows/s]

Upload complete. Skipped (missing embeddings without fill): 0





In [None]:

# Verify counts
try:
    q_count = client.count(collection_name=COLLECTION_NAME, exact=True).count
    print(f"Qdrant count: {q_count} | CSV rows: {len(df_up)}")
except Exception as e:
    print("Count check failed:", e)

# Reconcile missing IDs (fetch in chunks and re-upsert if needed)
ids = df["point_id"].tolist()

def verify_and_reconcile():
    missing = []
    for id_chunk in tqdm(list(chunker(ids, VERIFY_BATCH)), desc="Verifying IDs", unit="ids"):
        try:
            found = client.retrieve(collection_name=COLLECTION_NAME, ids=id_chunk, with_payload=False, with_vectors=False)
            found_ids = {p.id for p in found}
            for _id in id_chunk:
                if _id not in found_ids:
                    missing.append(_id)
        except Exception as e:
            print("Retrieve error; will continue:", e)
            time.sleep(1)

    print("Missing IDs after first pass:", len(missing))

    if missing:
        # Re-upsert missing
        for miss_chunk in tqdm(list(chunker(missing, UPSERT_BATCH)), desc="Re-upserting missing", unit="ids"):
            sub = df_up[df["point_id"].isin(miss_chunk)]
            pts = []
            for _, row in sub.iterrows():
                pt = build_point(row)
                if pt is not None:
                    pts.append(pt)
            if pts:
                upsert_batch(pts)

        # Re-check
        still = []
        for id_chunk in chunker(missing, VERIFY_BATCH):
            found = client.retrieve(collection_name=COLLECTION_NAME, ids=id_chunk, with_payload=False, with_vectors=False)
            found_ids = {p.id for p in found}
            still.extend([_id for _id in id_chunk if _id not in found_ids])
        print("Still missing after retry:", len(still))
    else:
        print("No missing IDs detected.")

verify_and_reconcile()
print("Done.")


Qdrant count: 462682 | CSV rows: 462682


Verifying IDs: 100%|██████████| 463/463 [01:26<00:00,  5.37ids/s]

Missing IDs after first pass: 0
No missing IDs detected.
Done.



