# Fraud PoC — robust & efficient LLM ingestion (batching, parallelism, retries)

This updated notebook includes comprehensive options to process transactions with an LLM while minimizing latency and handling timeouts:
- Modes: `batch` (recommended), `parallel`, `sequential`
- Adaptive batching with exponential backoff and fallback to smaller batches or per-item processing
- Non-streaming and streaming call implementations with retries
- Automatic model preloading and fallback to smaller models if resources are constrained
- Bulk inserts to DuckDB for efficiency
- Repair flow for missing/NaN risk scores

Usage: set environment variables if you want to override defaults:
- `FRAUD_DB_PATH` — path to duckdb file (default `notebooks/fraud_poc.duckdb`)
- `OLLAMA_URL` — default `http://localhost:11434/api/generate`
- `LLM_MODEL` — preferred model (default `olmo-3`)
- `PROCESS_MODE` — one of `batch`, `parallel`, `sequential`, or `auto` (default `auto`)

Run cells top-to-bottom; adjust `CONFIG` cell parameters to tune behavior for your environment.

In [None]:
# Basic environment checks and helper functions (memory/cpu and model availability)
import subprocess, shutil, sys, time
import json, re

def get_total_memory_gb():
    # Linux: read /proc/meminfo, fallback to shutil.disk_usage not ideal for mem
    try:
        with open('/proc/meminfo') as f:
            for line in f:
                if line.startswith('MemTotal:'):
                    parts = line.split()
                    # value is in kB
                    kb = int(parts[1])
                    return kb / 1024.0 / 1024.0
    except Exception:
        pass
    return None

def get_cpu_count():
    try:
        return os.cpu_count() or 1
    except Exception:
        return 1

def is_model_present(model_name):
    # Use ollama CLI if available to check installed models; fallback to returning False
    try:
        out = subprocess.run(['ollama', 'list'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, text=True)
        return model_name in out.stdout
    except FileNotFoundError:
        # CLI not present — can't check; assume server may have model
        return False

def pull_model_if_missing(model_name):
    try:
        print('Attempting ollama pull', model_name)
        subprocess.run(['ollama', 'pull', model_name], check=True)
        print('Model pull completed')
        return True
    except FileNotFoundError:
        print('ollama CLI not in PATH; skipping pull')
        return False
    except subprocess.CalledProcessError as e:
        print('ollama pull failed:', e)
        return False

print('Host CPU cores:', get_cpu_count())
print('Host memory GB (approx):', get_total_memory_gb())
print('ollama available in PATH?', shutil.which('ollama') is not None)
print('preferred model present (cli check):', is_model_present(PREFERRED_MODEL))


Host CPU cores: 12
Host memory GB (approx): 83.47371673583984
ollama available in PATH? False
preferred model present (cli check): False


In [4]:
# Robust call helpers: non-streaming and streaming with retries
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def requests_session_with_retries(total_retries=3, backoff=1.0):
    s = requests.Session()
    retries = Retry(total=total_retries, backoff_factor=backoff, status_forcelist=[429,500,502,503,504])
    s.mount('http://', HTTPAdapter(max_retries=retries))
    s.mount('https://', HTTPAdapter(max_retries=retries))
    return s

def call_ollama_non_stream(prompt, model=PREFERRED_MODEL, ollama_url=OLLAMA_URL, timeout=300, session=None):
    payload = {"model": model, "prompt": prompt, "temperature": 0.0, "max_tokens": 512}
    sess = session or requests_session_with_retries()
    r = sess.post(ollama_url, json=payload, timeout=timeout)
    r.raise_for_status()
    # Try parse JSON; if not, return text
    try:
        return r.json(), r.text
    except Exception:
        return None, r.text

def call_ollama_stream(prompt, model=PREFERRED_MODEL, ollama_url=OLLAMA_URL, timeout=600):
    # streaming with long timeout and basic retries managed by caller
    payload = {"model": model, "prompt": prompt, "temperature": 0.0, "max_tokens": 512}
    r = requests.post(ollama_url, json=payload, stream=True, timeout=timeout)
    r.raise_for_status()
    assembled = ''
    raw_lines = []
    for line in r.iter_lines(decode_unicode=True):
        if not line:
            continue
        try:
            chunk = json.loads(line)
            raw_lines.append(chunk)
        except Exception:
            raw_lines.append({'text': line})
            continue
        if chunk.get('response'):
            assembled += chunk['response']
        elif chunk.get('thinking'):
            assembled += chunk['thinking']
        if chunk.get('done'):
            break
    return assembled, raw_lines

print('Call helpers ready')


Call helpers ready


In [5]:
# Parsing helpers (same robust parser and extractor)
import numpy as np
def parse_risk_score(value):
    import math, json, re
    if value is None:
        return math.nan
    if isinstance(value, (int, float, np.integer, np.floating)):
        v = float(value)
        return math.nan if math.isnan(v) else v
    s = str(value).strip()
    try:
        obj = json.loads(s)
        if isinstance(obj, dict):
            for key in ("risk_score","score","risk","riskScore"):
                if key in obj:
                    return parse_risk_score(obj[key])
        elif isinstance(obj, (int, float)):
            return float(obj)
    except Exception:
        pass
    low = s.lower()
    if low in ("","null","none","n/a","na","nan"):
        return math.nan
    m = re.search(r'(-?\d+(?:[.,]\d+)?)\s*%', s)
    if m:
        try:
            num = float(m.group(1).replace(',','.'))
            return num/100.0
        except:
            return math.nan
    m = re.search(r'(-?\d+(?:[.,]\d+)?)', s)
    if m:
        try:
            num = float(m.group(1).replace(',','.'))
        except:
            return math.nan
        if num < 0:
            return math.nan
        if num > 1 and num <= 100:
            return num/100.0
        return float(num)
    return math.nan

def extract_final_text_from_response(raw):
    if raw is None:
        return ''
    text = str(raw)
    lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
    for ln in reversed(lines):
        try:
            obj = json.loads(ln)
            if isinstance(obj, dict):
                for key in ("risk_score","score","risk","riskScore"):
                    if key in obj:
                        return obj[key]
                if obj.get('response'):
                    return obj['response']
                if obj.get('thinking'):
                    return obj['thinking']
            elif isinstance(obj, (int,float)):
                return obj
        except Exception:
            if len(ln) > 0:
                return ln
    return text

print('Parsing helpers ready')


Parsing helpers ready


In [6]:
# Bulk insert helper
import pandas as pd
import duckdb

def bulk_insert_llm_results(con, results):
    if len(results) == 0:
        return 0
    df = pd.DataFrame(results)
    # ensure needs_review column exists
    try:
        con.execute('ALTER TABLE llm_results ADD COLUMN IF NOT EXISTS needs_review BOOLEAN DEFAULT FALSE')
    except Exception:
        pass
    # Insert using a registered dataframe
    con.register('batch_df', df)
    cols = ', '.join(df.columns)
    con.execute(f'INSERT INTO llm_results ({cols}) SELECT {cols} FROM batch_df')
    return len(results)

print('Bulk insert helper ready')


Bulk insert helper ready


In [7]:
# Adaptive batching runner with retries, fallback to smaller batch sizes and per-item processing
from math import ceil
import datetime

def process_batches_and_insert(DB_PATH, mode='batch', batch_size=BATCH_SIZE, min_batch=MIN_BATCH_SIZE, max_timeout=MAX_BATCH_CALL_TIMEOUT):
    print(con.execute("SHOW TABLES").fetchall())
    rows = con.execute("""
    SELECT t.tx_id, t.account_id, t.amount, t.currency, t.merchant, t.description
    FROM transactions t
    LEFT JOIN llm_results l ON t.tx_id = l.tx_id
    WHERE l.id IS NULL
    LIMIT 1000
    """).df()
    print('Fetched', len(rows), 'unprocessed txs')
    results_to_insert = []
    sess = requests_session_with_retries(total_retries=2, backoff=1)

    def attempt_batch(prompt, timeout):
        # try non-streaming first (may be faster to get full JSON array)
        try:
            parsed_json, text = call_ollama_non_stream(prompt, timeout=timeout, session=sess)
            if isinstance(parsed_json, (list, dict)):
                return parsed_json, text, None
            # parsed_json may be None but text contains content
            return None, text, None
        except Exception as e:
            # fallback to streaming call
            try:
                assembled, raw_lines = call_ollama_stream(prompt, timeout=timeout)
                return None, assembled, raw_lines
            except Exception as e2:
                return None, None, e2

    # helper to build strict JSON batch prompt
    def build_batch_prompt(tx_records):
        prompt = (
            'You are an assistant that evaluates transaction fraud risk.\n'
            'For each transaction below, return a JSON array where each element is an object: '
            '{"tx_id": <tx_id>, "risk_score": <number between 0 and 1>, "explanation": <short string>}\n'
            'Return ONLY the JSON array — no extra text.\n\nTransactions:\n'
        )
        for r in tx_records:
            prompt += f"- tx_id={r['tx_id']} account={r['account_id']} amount={r['amount']} {r['currency']} merchant={r['merchant']} description={r['description']}\n"
        prompt += '\nRespond with a JSON array matching the order of the transactions.'
        return prompt

    i = 0
    while i < len(rows):
        current_batch = rows.iloc[i:i+batch_size].to_dict(orient='records')
        prompt = build_batch_prompt(current_batch)
        timeout = min(max_timeout, max(60, 60 * ceil(len(current_batch)/2)))
        print(f'Calling batch starting at index {i}, size {len(current_batch)}, timeout {timeout}s')
        parsed, text_or_assembled, raw_or_err = attempt_batch(prompt, timeout)
        if parsed is None and isinstance(text_or_assembled, str) and raw_or_err is None:
            # we have text (non-json) or assembled stream
            assembled_text = text_or_assembled
            # attempt to parse JSON from assembled_text
            try:
                parsed_try = json.loads(assembled_text)
                if not isinstance(parsed_try, list):
                    parsed_try = None
            except Exception:
                parsed_try = None
            if parsed_try is None:
                # If batch_size > min_batch, retry with half-size
                if batch_size > min_batch and len(current_batch) > min_batch:
                    print('Batch JSON parse failed — reducing batch size and retrying')
                    # split the current batch into two half batches by adjusting batch_size locally
                    # we'll requeue the second half by inserting it back at position i+half
                    half = max(min_batch, len(current_batch)//2)
                    # schedule second half next
                    # do not increment i by full batch_size; instead process first half now
                    # process first half immediately by setting local_batch and prompt
                    first_half = current_batch[:half]
                    second_half = current_batch[half:]
                    # process first_half now by setting rows slice accordingly
                    rows = pd.concat([rows.iloc[:i], pd.DataFrame(first_half), pd.DataFrame(second_half), rows.iloc[i+len(current_batch):]]).reset_index(drop=True)
                    # keep batch_size same and continue (the inserted first_half will be processed next loop)
                    continue
                else:
                    # cannot split further — fallback to per-item processing for this batch
                    print('Falling back to per-item processing for this batch')
                    for rec in current_batch:
                        single_prompt = build_batch_prompt([rec])
                        try:
                            parsed_obj, text = call_ollama_non_stream(single_prompt, timeout=SEQUENTIAL_TIMEOUT, session=sess)
                            if isinstance(parsed_obj, list) and len(parsed_obj) == 1:
                                entry = parsed_obj[0]
                                raw_score = entry.get('risk_score') or entry.get('score')
                                parsed_val = parse_risk_score(raw_score)
                            else:
                                assembled_single, raw_lines_single = call_ollama_stream(single_prompt, timeout=SEQUENTIAL_TIMEOUT)
                                extracted = extract_final_text_from_response(assembled_single)
                                parsed_val = parse_risk_score(extracted)
                        except Exception as ee:
                            parsed_val = None
                        needs_review = parsed_val is None or (isinstance(parsed_val, float) and (parsed_val != parsed_val))
                        results_to_insert.append({
                            'id': str(uuid.uuid4()), 'tx_id': rec['tx_id'], 'llm_model': PREFERRED_MODEL,
                            'llm_response': assembled_single if 'assembled_single' in locals() else '',
                            'parsed_response': json.dumps({'parsed_risk': None if parsed_val is None else parsed_val}),
                            'risk_score': (None if parsed_val is None else float(parsed_val)), 'needs_review': needs_review, 'created_at': datetime.datetime.utcnow()
                        })
                    i += len(current_batch)
                    continue
            else:
                parsed = parsed_try
                # proceed to accept parsed
        elif parsed is not None:
            # parsed is returned directly (non-streaming best case)
            pass
        elif isinstance(raw_or_err, Exception) or text_or_assembled is None:
            # request failed even after streaming attempt — mark all for review
            print('Batch call failed even after streaming; marking batch for review:', raw_or_err)
            for rec in current_batch:
                results_to_insert.append({
                    'id': str(uuid.uuid4()), 'tx_id': rec['tx_id'], 'llm_model': PREFERRED_MODEL,
                    'llm_response': '' , 'parsed_response': json.dumps({'error': str(raw_or_err)}), 'risk_score': None, 'needs_review': True, 'created_at': datetime.datetime.utcnow()
                })
            i += len(current_batch)
            continue

        # If we have a parsed list, map entries
        if parsed is None and isinstance(parsed_try, list):
            parsed = parsed_try

        if isinstance(parsed, list):
            if len(parsed) != len(current_batch):
                # mismatch — mark for review
                print('Length mismatch between parsed JSON and batch; marking batch for review')
                for rec in current_batch:
                    results_to_insert.append({
                        'id': str(uuid.uuid4()), 'tx_id': rec['tx_id'], 'llm_model': PREFERRED_MODEL,
                        'llm_response': (text_or_assembled if isinstance(text_or_assembled, str) else json.dumps(text_or_assembled)),
                        'parsed_response': json.dumps(parsed), 'risk_score': None, 'needs_review': True, 'created_at': datetime.datetime.utcnow()
                    })
            else:
                for entry in parsed:
                    tx_id = entry.get('tx_id')
                    raw_score = entry.get('risk_score') if 'risk_score' in entry else entry.get('score') if 'score' in entry else None
                    parsed_val = parse_risk_score(raw_score)
                    needs_review = math.isnan(parsed_val) or parsed_val < 0 or parsed_val > 1
                    results_to_insert.append({
                        'id': str(uuid.uuid4()),
                        'tx_id': tx_id,
                        'llm_model': PREFERRED_MODEL,
                        'llm_response': (text_or_assembled if isinstance(text_or_assembled, str) else json.dumps(text_or_assembled)),
                        'parsed_response': json.dumps(entry),
                        'risk_score': (None if needs_review else float(parsed_val)),
                        'needs_review': needs_review,
                        'created_at': datetime.datetime.utcnow()
                    })
        i += len(current_batch)
    # Bulk insert all results
    inserted = bulk_insert_llm_results(con, results_to_insert)
    print('Inserted', inserted, 'rows')
    return inserted

print('Adaptive batch runner defined')


Adaptive batch runner defined


In [8]:
# Parallel per-tx runner (safe small concurrency)
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_parallel_and_insert(db_path, workers=PARALLEL_WORKERS, max_items=200):
    ##con = duckdb.connect(db_path)
    df = con.execute("SELECT t.tx_id, t.account_id, t.amount, t.currency, t.merchant, t.description FROM transactions t LEFT JOIN llm_results l ON t.tx_id = l.tx_id WHERE l.id IS NULL LIMIT ?", [max_items]).df()
    tx_list = df.to_dict(orient='records')
    print('Parallel runner: txs to process =', len(tx_list))
    results = []
    def worker(tx):
        prompt = f"Transaction: account={tx['account_id']} amount={tx['amount']} {tx['currency']} merchant={tx['merchant']} description={tx['description']}\n\nReturn a JSON object with tx_id and risk_score between 0 and 1 and a short explanation."
        try:
            parsed_obj, text = call_ollama_non_stream(prompt, timeout=SEQUENTIAL_TIMEOUT)
            if isinstance(parsed_obj, dict):
                raw_score = parsed_obj.get('risk_score') or parsed_obj.get('score')
            else:
                assembled, raw_lines = call_ollama_stream(prompt, timeout=SEQUENTIAL_TIMEOUT)
                raw_score = extract_final_text_from_response(assembled)
        except Exception as e:
            return {'id': str(uuid.uuid4()), 'tx_id': tx['tx_id'], 'llm_model': PREFERRED_MODEL, 'llm_response': '', 'parsed_response': json.dumps({'error': str(e)}), 'risk_score': None, 'needs_review': True, 'created_at': datetime.datetime.utcnow()}
        parsed_val = parse_risk_score(raw_score)
        needs_review = math.isnan(parsed_val) or parsed_val < 0 or parsed_val > 1
        parsed_db = None if needs_review else float(parsed_val)
        return {'id': str(uuid.uuid4()), 'tx_id': tx['tx_id'], 'llm_model': PREFERRED_MODEL, 'llm_response': (assembled if 'assembled' in locals() else (text if 'text' in locals() else '')), 'parsed_response': json.dumps({'parsed_risk': parsed_db}), 'risk_score': parsed_db, 'needs_review': needs_review, 'created_at': datetime.datetime.utcnow()}

    with ThreadPoolExecutor(max_workers=workers) as exe:
        futures = {exe.submit(worker, tx): tx for tx in tx_list}
        for fut in as_completed(futures):
            try:
                results.append(fut.result())
            except Exception as e:
                print('Worker error', e)
    inserted = bulk_insert_llm_results(con, results)
    print('Parallel inserted', inserted, 'rows')
    return inserted

print('Parallel runner defined')


Parallel runner defined


In [9]:
# Orchestrator: pick mode and run with safety checks
conservative_mode = PROCESS_MODE
if PROCESS_MODE == 'auto':
    # Heuristic: if machine memory < 8GB, prefer sequential or small batch and smaller model
    mem = get_total_memory_gb() or 0
    if mem and mem < 8:
        conservative_mode = 'sequential'
    else:
        conservative_mode = 'batch'

print('Orchestrator selected mode:', conservative_mode)

if conservative_mode == 'batch':
    inserted = process_batches_and_insert(DB_PATH, mode='batch', batch_size=BATCH_SIZE)
elif conservative_mode == 'parallel':
    inserted = process_parallel_and_insert(DB_PATH, workers=PARALLEL_WORKERS)
else:
    # sequential fallback: same as batch with min_batch=1
    inserted = process_batches_and_insert(DB_PATH, mode='sequential', batch_size=1, min_batch=1, max_timeout=SEQUENTIAL_TIMEOUT)
print('Done — total rows inserted (approx):', inserted)


Orchestrator selected mode: batch
[]


CatalogException: Catalog Error: Table with name transactions does not exist!
Did you mean "duckdb_constraints"?

LINE 3:     FROM transactions t
                 ^

In [None]:
# Repair: reprocess rows with NULL/NaN risk_score using the same logic (attempt batch then per-item)
def repair_missing(db_path, limit=200):
    con = duckdb.connect(db_path)
    to_reprocess = con.execute("""
    SELECT l.id as llm_id, l.tx_id, t.account_id, t.amount, t.currency, t.merchant, t.description
    FROM llm_results l
    LEFT JOIN transactions t ON l.tx_id = t.tx_id
    WHERE l.risk_score IS NULL OR (l.risk_score != l.risk_score)
    LIMIT ?
    """, [limit]).df()
    print('Repair candidates:', len(to_reprocess))
    # reuse batch runner but process each row individually (safe)
    for row in to_reprocess.to_dict(orient='records'):
        llm_id = row['llm_id']
        tx_id = row['tx_id']
        if tx_id is None:
            continue
        prompt = f"Transaction: account={row['account_id']} amount={row['amount']} {row['currency']} merchant={row['merchant']} description={row['description']}\n\nReturn a JSON object with tx_id and risk_score between 0 and 1 and a short explanation."
        try:
            parsed_obj, text = call_ollama_non_stream(prompt, timeout=SEQUENTIAL_TIMEOUT)
            if isinstance(parsed_obj, dict):
                raw_score = parsed_obj.get('risk_score') or parsed_obj.get('score')
            else:
                assembled, raw_lines = call_ollama_stream(prompt, timeout=SEQUENTIAL_TIMEOUT)
                raw_score = extract_final_text_from_response(assembled)
        except Exception as e:
            print('Repair call failed for', llm_id, e)
            continue
        parsed_val = parse_risk_score(raw_score)
        needs_review = math.isnan(parsed_val) or parsed_val < 0 or parsed_val > 1
        parsed_db = None if needs_review else float(max(0.0, min(1.0, parsed_val)))
        con.execute("""
            UPDATE llm_results
            SET llm_response = ?, parsed_response = ?, risk_score = ?, needs_review = ?
            WHERE id = ?
        """, ( (assembled if 'assembled' in locals() else (text if 'text' in locals() else '')), json.dumps({'parsed_risk': parsed_db}), parsed_db, needs_review, llm_id))
        print('Repaired', llm_id, '->', parsed_db, 'needs_review=', needs_review)
    print('Repair pass complete')

print('Repair function ready')


In [None]:
# Final diagnostics
con = duckdb.connect(DB_PATH)
print('Total transactions:', con.execute('SELECT COUNT(*) FROM transactions').fetchone()[0])
print('Total llm_results rows:', con.execute('SELECT COUNT(*) FROM llm_results').fetchone()[0])
print('Missing/NaN risk_score count:', con.execute("SELECT COUNT(*) FROM llm_results WHERE risk_score IS NULL OR (risk_score != risk_score)").fetchone()[0])
print('Needs review count:', con.execute("SELECT COUNT(*) FROM llm_results WHERE needs_review = TRUE").fetchone()[0])


Notes, tips and next steps:
- The notebook implements three approaches: batching (recommended), parallel small-concurrency workers, and sequential per-item fallback. It adapts to failures by reducing batch size and ultimately doing per-item reprocessing.
- If you continue to see long batch durations or timeouts, try:
  - Decreasing BATCH_SIZE to 2–4
  - Using a smaller model (set LLM_MODEL env var to `phi-2.7b` or `gemma:2b`)
  - Increasing timeouts for slow CPU inference hosts (MAX_BATCH_CALL_TIMEOUT)
  - Running fewer parallel workers or using sequential mode on low-memory Codespaces
- The orchestrator picks `batch` when host memory appears adequate; if memory is low it uses `sequential`. You can override with PROCESS_MODE.
- If you'd like, I can commit this notebook to your repository on a branch and open a PR. Tell me which branch to base from and the PR branch name if you'd like that done.