In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
import os
import shutil
import subprocess
import time
import requests
import torch
from pathlib import Path

# Fix paths so we can import 'extract.py'
project_root = Path(os.getcwd()).parent
script_dir = project_root / "src/transform"
if str(script_dir) not in sys.path:
    sys.path.append(str(script_dir))

# Import your optimized processor
from extract import MarkerFolderProcessor, configure_parallelism

In [3]:
# Paths
SCRATCH = Path(os.environ.get("SCRATCH"))
INPUT_PDFS = SCRATCH / "mshauri-fedha/data/cbk/pdfs"
OUTPUT_DIR = SCRATCH / "mshauri-fedha/data/cbk/marker-output"

# Ollama Setup
OLLAMA_HOME = SCRATCH / "ollama_core"
OLLAMA_BIN = OLLAMA_HOME / "bin/ollama"
OLLAMA_MODELS_DIR = OLLAMA_HOME / "models" 
OLLAMA_HOST = "http://localhost:11434"

print("‚úÖ Setup complete.")

‚úÖ Setup complete.


In [4]:
total_slots, workers_per_gpu, num_gpus = configure_parallelism()

üîç GH200/A100 Detected: 4 GPUs | 94.5 GB VRAM
‚öôÔ∏è  Stability Config: 5 workers/GPU | 20 Total Slots


In [5]:
# Kill any old server first
subprocess.run(["pkill", "-f", "ollama serve"], stderr=subprocess.DEVNULL)
time.sleep(2)

server_env = os.environ.copy()
server_env["OLLAMA_NUM_PARALLEL"] = str(32) # Matches your total slots
server_env["OLLAMA_MAX_LOADED_MODELS"] = "1"
server_env["OLLAMA_MAX_QUEUE"] = "2048"

# Start new server
process = subprocess.Popen(
    [str(OLLAMA_BIN), "serve"], 
    stdout=subprocess.DEVNULL, 
    stderr=subprocess.DEVNULL,
    env=server_env
)

In [None]:
# Robust Wait Loop
print("Waiting for server heartbeat...")
server_ready = False
for _ in range(60): # Wait 60 seconds max
    try:
        if requests.get(OLLAMA_HOST).status_code == 200:
            server_ready = True
            break
    except:
        time.sleep(1)

if server_ready:
    print("‚úÖ Server is UP and listening!")
else:
    raise RuntimeError("Server failed to start. Check logs.")

‚è≥ Waiting for server heartbeat...
‚úÖ Server is UP and listening!


In [None]:
# pull model
BASE_MODEL = "qwen2.5:7b" 
CUSTOM_MODEL_NAME = "qwen2.5-7b-16k"

print(f" Checking/Pulling {BASE_MODEL}...")
subprocess.run(
    [str(OLLAMA_BIN), "pull", BASE_MODEL], 
    check=True, 
    stdout=subprocess.DEVNULL,
    stderr=subprocess.DEVNULL,
    env=os.environ.copy()
)

print(f"Creating '{CUSTOM_MODEL_NAME}' (16k Context)...")
modelfile_content = f"FROM {BASE_MODEL}\nPARAMETER num_ctx 16384"
with open("Modelfile_qwen_16k", "w") as f:
    f.write(modelfile_content)

‚¨áÔ∏è  Checking/Pulling qwen2.5:7b...
üìù Creating 'qwen2.5-7b-16k' (16k Context)...


In [None]:
# run model
subprocess.run(
    [str(OLLAMA_BIN), "create", CUSTOM_MODEL_NAME, "-f", "Modelfile_qwen"], 
    check=True, 
    stdout=subprocess.DEVNULL, 
    env=os.environ.copy()
)
print("Model Ready.")

‚úÖ Model Ready.


[?2026h[?25l[1Ggathering model components [K
using existing layer sha256:2bada8a7450677000f678be90653b85d364de7db25eb5ea54136ada5f3933730 [K
using existing layer sha256:66b9ea09bd5b7099cbb4fc820f31b575c0366fa439b08245566692c6784e281e [K
using existing layer sha256:eb4402837c7829a690fa845de4d7f3fd842c2adee476d5341da8a46ea9255175 [K
using existing layer sha256:832dd9e00a68dd83b3c3fb9f5588dad7dcf337a0db50f7d9483f310cd292e92e [K
using existing layer sha256:db8fbfd0cb288a053f83ac9014ca9bac2558b1bbcd80b5c408a548e7acba8a24 [K
writing manifest ‚†ã [K[?25h[?2026l[?2026h[?25l[A[A[A[A[A[A[1Ggathering model components [K
using existing layer sha256:2bada8a7450677000f678be90653b85d364de7db25eb5ea54136ada5f3933730 [K
using existing layer sha256:66b9ea09bd5b7099cbb4fc820f31b575c0366fa439b08245566692c6784e281e [K
using existing layer sha256:eb4402837c7829a690fa845de4d7f3fd842c2adee476d5341da8a46ea9255175 [K
using existing layer sha256:832dd9e00a68dd83b3c3fb9f5588dad7dcf337a0db

In [9]:
os.chdir(SCRATCH)

In [None]:
# Initialize the Processor
processor = MarkerFolderProcessor(
    output_dir=OUTPUT_DIR,
    ollama_url=OLLAMA_HOST,
    ollama_model=CUSTOM_MODEL_NAME,
    batch_multiplier=4,                  
    workers_per_gpu=workers_per_gpu,
    num_gpus=num_gpus                   
)

# 3. Run the extraction
print(f"üöÄ Processing PDFs from: {INPUT_PDFS}")
processor.process_folder(INPUT_PDFS, batch_size=5)

‚úÖ Detected 4 GPUs (Dynamic Mode)
üöÄ Processing PDFs from: /capstor/scratch/cscs/tligawa/mshauri-fedha/data/cbk/pdfs
üì¶ Created 1089 batches of 5 files each.
üöÄ Launching 20 workers on 4 GPUs...


20:26:57 - [GPU-3:Dev3] - Initializing Worker 3...
20:26:58 - [GPU-0:Dev0] - Initializing Worker 0...
20:27:05 - [GPU-1:Dev1] - Initializing Worker 1...
20:27:06 - [GPU-2:Dev2] - Initializing Worker 2...
20:27:09 - [GPU-4:Dev0] - Initializing Worker 4...
20:27:11 - [GPU-5:Dev1] - Initializing Worker 5...
20:27:12 - [GPU-6:Dev2] - Initializing Worker 6...
20:27:12 - [GPU-9:Dev1] - Initializing Worker 9...
20:27:14 - [GPU-7:Dev3] - Initializing Worker 7...
20:27:15 - [GPU-8:Dev0] - Initializing Worker 8...
