In [5]:
import io
import json

import numpy as np
import sagemaker
import boto3
from torch.backends.opt_einsum import strategy
from torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook import batched_powerSGD_hook

# 2) Hard‑code (or read from env var) the execution‑role ARN you created
role = "arn:aws:iam::371087393859:role/defaultrole"
bucket = "ir-sagemaker"
session = boto3.Session(profile_name="lprofile", region_name="us-east-1")

sm_session = sagemaker.Session(boto_session=session, default_bucket=bucket)

In [6]:

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sm_session.default_bucket()}")
print(f"sagemaker session region: {sm_session.boto_region_name}")


sagemaker role arn: arn:aws:iam::371087393859:role/defaultrole
sagemaker bucket: ir-sagemaker
sagemaker session region: us-east-1


In [7]:
import sys
print(sys.version)

3.11.13 (main, Jun  5 2025, 13:12:00) [GCC 11.2.0]


In [8]:
from sagemaker.s3 import S3Uploader
bucket = sm_session.default_bucket()
prefix = "modernbert"

train_uri = f"s3://{bucket}/{prefix}/train/train.jsonl"
val_uri   = f"s3://{bucket}/{prefix}/val/val.jsonl"
test_uri  = f"s3://{bucket}/{prefix}/test/test.jsonl"
target_set_uri  = f"s3://{bucket}/{prefix}/target/target_set.jsonl"

In [5]:
train_uri = S3Uploader.upload("modernbert/data/train/train.jsonl", f"s3://{bucket}/{prefix}/train")
val_uri = S3Uploader.upload("modernbert/data/val/val.jsonl",   f"s3://{bucket}/{prefix}/val")
test_uri = S3Uploader.upload("modernbert/data/test/test.jsonl", f"s3://{bucket}/{prefix}/test")
target_set_uri = S3Uploader.upload("modernbert/data/target/target_set.jsonl", f"s3://{bucket}/{prefix}/target_set")

In [18]:
from sagemaker.huggingface import HuggingFace

metric_definitions=[
    {'Name': 'loss', 'Regex': "'loss': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'learning_rate', 'Regex': "'learning_rate': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_loss', 'Regex': "'eval_loss': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_accuracy', 'Regex': "'eval_accuracy': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_f1', 'Regex': "'eval_f1': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_precision', 'Regex': "'eval_precision': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_recall', 'Regex': "'eval_recall': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_runtime', 'Regex': "'eval_runtime': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'eval_samples_per_second', 'Regex': "'eval_samples_per_second': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'epoch', 'Regex': "'epoch': ([0-9]+(.|e\-)[0-9]+),?"}]

hyper = {"learning_rate":3e-5,
         "num_train_epochs":1,
         "max_steps":2,
         "temperature":0.05,
         "deepspeed": "ds_zero3.json"}

est = HuggingFace(
    entry_point="train_sm.py",
    source_dir="modernbert",
    role=role,
    instance_type="ml.g5.12xlarge",
    instance_count=1,
    distribution={"mpi": {"enabled": True}},
    transformers_version="4.49.0", pytorch_version="2.5.1", py_version="py311",
    hyperparameters=hyper,
    metric_definitions=metric_definitions,
    environment={
        "PYTORCH_CUDA_ALLOC_CONF": "expandable_segments:True",
        "NCCL_DEBUG": "INFO"
    },
    output_path=f"s3://{bucket}/{prefix}/outputs"
)

In [19]:
est.fit({"train": train_uri, "val": val_uri, "test": test_uri})

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker:Creating training-job with name: huggingface-pytorch-training-2025-08-13-17-23-57-467


2025-08-13 17:23:59 Starting - Starting the training job
2025-08-13 17:23:59 Pending - Training job waiting for capacity......
2025-08-13 17:24:45 Pending - Preparing the instances for training...
2025-08-13 17:25:16 Downloading - Downloading the training image...........................
2025-08-13 17:29:49 Training - Training image download completed. Training in progress....[34mbash: cannot set terminal process group (-1): Inappropriate ioctl for device[0m
[34mbash: no job control in this shell[0m
[34mCUDA compat package should be installed for NVIDIA driver smaller than 550.163.01[0m
[34mCurrent installed NVIDIA driver version is 570.172.08[0m
[34mSkipping CUDA compat setup as newer NVIDIA driver is installed[0m
[34m2025-08-13 17:30:30,042 sagemaker-training-toolkit INFO     Imported framework sagemaker_pytorch_container.training[0m
[34m2025-08-13 17:30:30,080 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2025-08-13 17

In [46]:
from sagemaker.analytics import TrainingJobAnalytics

df = TrainingJobAnalytics(training_job_name='huggingface-pytorch-training-2025-07-30-03-59-45-613').dataframe()
print(df)




    timestamp              metric_name      value
0         0.0                     loss   3.360500
1       180.0                     loss   2.731600
2         0.0            learning_rate   1.721739
3       180.0            learning_rate   4.173913
4         0.0            eval_accuracy   0.031250
5         0.0                  eval_f1   0.019156
6         0.0           eval_precision   0.022735
7         0.0              eval_recall   0.031250
8         0.0             eval_runtime  21.798900
9         0.0  eval_samples_per_second  22.157000
10        0.0                    epoch   0.430000
11      180.0                    epoch   0.870000
12      240.0                    epoch   1.000000


#### Model Evaluation

In [9]:
from sagemaker.huggingface import HuggingFaceModel
from sklearn.metrics import f1_score
import numpy as np
import tqdm
import faiss
import uuid
import sagemaker
import torch

print("numpy version:", np.__version__)
print("tqdm version:", tqdm.__version__)
print("torch version:", torch.__version__)
print("faiss version:", faiss.__version__)

numpy version: 1.26.4
tqdm version: 4.67.1
torch version: 2.8.0+cu128
faiss version: 1.9.0


In [10]:
s3 = boto3.client("s3")
model_data = f"s3://{bucket}/{prefix}/outputs/huggingface-pytorch-training-2025-08-13-17-23-57-467/output/model.tar.gz"
run_id = uuid.uuid4().hex
input_key = f"{prefix}/target_set/target_set.jsonl"
output_key = f"{prefix}/batch-output/{run_id}.jsonl"

body = s3.get_object(Bucket=bucket, Key=input_key)["Body"].read().decode("utf-8")
j = 0
out_buf = io.StringIO()
for line in body.splitlines():
    #if j == 1:
    #    break
    #j += 1
    rec = json.loads(line)
    doc_id = rec["doc_id"]
    for i, sent in enumerate(rec["case_text"]):
        #if i == 3:
         #   break
        out_buf.write(json.dumps({"docid": doc_id, "sentid": i, "inputs": sent}) + "\n")
        # print(j,i)

s3.put_object(Bucket=bucket, Key=output_key, Body=out_buf.getvalue().encode("utf-8"))
sentences_set_uri = f"s3://{bucket}/{output_key}"
print(sentences_set_uri)

s3://ir-sagemaker/modernbert/batch-output/f84d458b9c8b4cff8b8db170fca64a6c.jsonl


In [None]:
output_prefix = f"{prefix}/embedded-output/{run_id}"
print(sentences_set_uri)
# create model class
huggingface_model = HuggingFaceModel(
    model_data=model_data,
    role=role,
    transformers_version="4.49.0",
    pytorch_version="2.6.0",
    py_version="py312",
    entry_point="inference.py",
    source_dir="code",
    env={
        "TOWER": "suffix",
        "POOLING": "mean",
        "NORMALIZE": "true",
        "MAX_LEN": "512",
    }
)

transformer = huggingface_model.transformer(
    instance_count=1,
    instance_type="ml.g5.2xlarge",
    output_path=f"s3://{bucket}/{output_prefix}",
    assemble_with="Line",
    accept="application/jsonlines",
    strategy="MultiRecord",
    max_payload=1,
    max_concurrent_transforms=2,
    env={"TS_MAX_RESPONSE_SIZE": "100000000",
         "TS_MAX_REQUEST_SIZE":  "100000000",},
)

transformer.transform(
    data=sentences_set_uri,
    content_type="application/jsonlines",
    split_type="Line",
    input_filter="$.inputs",
    join_source="Input",
)
transformer.wait()

resp = s3.list_objects_v2(Bucket=bucket, Prefix=output_prefix)
out_keys = [o["Key"] for o in resp.get("Contents", []) if o["Key"].endswith(".out")]
assert out_keys, f"No output files under s3://{bucket}/{output_prefix}"

sent_embeddings = {}

for k in out_keys:
    obj = s3.get_object(Bucket=bucket, Key=k)
    for raw in obj["Body"].iter_lines():
        if not raw:
            continue
        rec = json.loads(raw)

        # after join_source="Input", your metadata is intact and the prediction is here:
        emb = rec["SageMakerOutput"]["embedding"]     # shape depends on your model/inference.py

        # Build a stable key (doc_id + sent_id)
        key = f'{rec["docid"]}:{rec["sentid"]}'
        sent_embeddings[key] = np.array(emb, dtype="float32")

s3://ir-sagemaker/modernbert/batch-output/f84d458b9c8b4cff8b8db170fca64a6c.jsonl


In [15]:
import pprint

sm = boto3.client("sagemaker")

job_name = "huggingface-pytorch-inference-2025-08-17-20-24-43-053"
resp = sm.describe_transform_job(TransformJobName=job_name)

# Print the bits that matter for JSONL + join
pp = pprint.PrettyPrinter(indent=2)
pp.pprint({
    "Status": resp["TransformJobStatus"],
    "FailureReason": resp.get("FailureReason"),
    "BatchStrategy": resp.get("BatchStrategy"),
    "MaxPayloadInMB": resp.get("MaxPayloadInMB"),
    "TransformInput": {
        "ContentType": resp["TransformInput"].get("ContentType"),
        "SplitType": resp["TransformInput"].get("SplitType"),
    },
    "TransformOutput": {
        "Accept": resp["TransformOutput"].get("Accept"),
        "AssembleWith": resp["TransformOutput"].get("AssembleWith"),
        "S3OutputPath": resp["TransformOutput"]["S3OutputPath"],
    },
    "DataProcessing": resp.get("DataProcessing"),  # includes JoinSource, OutputFilter
})

In [33]:
resp = s3.list_objects_v2(Bucket=bucket, Prefix=output_prefix)
out_keys = [o["Key"] for o in resp.get("Contents", []) if o["Key"].endswith(".out")]
assert out_keys, f"No output files under s3://{bucket}/{output_prefix}"

sent_embeddings = {}

for k in out_keys:
    obj = s3.get_object(Bucket=bucket, Key=k)
    for raw in obj["Body"].iter_lines():
        if not raw:
            continue
        rec = json.loads(raw)

        # after join_source="Input", your metadata is intact and the prediction is here:
        emb = rec["embedding"]

        key = f'{rec["docid"]}:{rec["sentid"]}'
        sent_embeddings[key] = np.array(emb, dtype="float32")

print(sent_embeddings)


{'A2016_European_Commission_v_World_Duty_Free:0': array([ 2.96334340e-03, -4.57810657e-03, -6.03524968e-03,  7.31856190e-03,
       -8.56655836e-03, -6.94716908e-03, -3.47589375e-03, -6.84461324e-03,
       -5.52418455e-03,  2.63813697e-03,  4.52040089e-03,  7.85660371e-03,
       -4.01485851e-03,  2.65545258e-03,  2.57724710e-03, -2.26619653e-03,
        1.61755388e-03,  7.75666535e-03,  3.90741305e-04, -1.36289571e-03,
        1.00031858e-02,  2.83362751e-04,  8.95528123e-03,  3.11931071e-04,
       -3.84916062e-03, -4.70993016e-03, -4.46944311e-03,  2.82229879e-03,
        8.38186964e-03,  4.94436268e-03,  3.38223227e-03,  1.13856858e-02,
       -2.42447364e-03,  2.77312007e-03,  8.80937371e-03,  3.13100452e-03,
       -2.38765636e-03,  1.15786744e-02,  4.72873123e-03, -7.73071684e-03,
       -2.03685230e-03,  5.07178309e-04,  2.36486504e-03, -4.66394471e-03,
       -2.40622554e-03,  4.10904037e-03,  9.32672527e-03,  3.80355702e-03,
       -3.97333689e-03,  1.13462415e-02,  1.750452

In [34]:
doc_to_int, int_to_doc = {}, {}
next_doc_int = 1

ids = []
vecs = []

for key, vec in sent_embeddings.items():
    doc_id, sent_id_str = key.split(":", 1)
    sent_id = int(sent_id_str)

    if doc_id not in doc_to_int:
        doc_to_int[doc_id] = next_doc_int
        int_to_doc[next_doc_int] = doc_id
        next_doc_int += 1

    did = np.int64(doc_to_int[doc_id])
    fid = (did << np.int64(32)) | np.int64(sent_id)
    ids.append(fid)
    vecs.append(vec.astype("float32"))

X = np.vstack(vecs).astype("float32")

# (optional) cosine via inner product
faiss.normalize_L2(X)

base = faiss.IndexFlatIP(X.shape[1])
index = faiss.IndexIDMap2(base)
index.add_with_ids(X, np.asarray(ids, dtype="int64"))

In [None]:

# Evaluation

k_list = [1, 5, 10, 20]
k_max = max(k_list)

hits_at_k = {k:0 for k in k_list}
f1_gold, f1_pred = [], []

start = 0
for rec in tqdm.tqdm(test_records, desc="evaluate"):
    end = sent_offsets.pop(0)
    doc_sentence_slice = slice(start, end)
    start = end

    prefix_vec = np.array(
        prefix_predictor.predict({"inputs": [rec["prefix"]]}),
        dtype = "float32",
    )

    # Check this masking because it is not the same exactly
    mask_same_sentence = [
        sent in rec["prefix"] for sent in rec["sentences"]
    ]
    mast_same_sentence = np.array(mask_same_sentence, dtype=bool)

    # ids of candidates to keep
    keep_ids = np.where(~mast_same_sentence)[0] + doc_sentence_slice.start
    keep_vecs = all_vecs[keep_ids]

    # faiss index
    D, I = index.search(prefix_vec, k_max)

    # drop candidates with global id not in keep_ids
    valid = [i for i in I[0] if i in keep_ids][:k_max]
    if len(valid) < k_max:
        extra = [i for i in I[0] if i not in keep_ids]
        valid.extend(extra[: k_max - len(valid)])

    # Compute metrix
    for k in k_list:
        if any(all_sentences[i] == rec["positive"] for i in valid[:k]):
            hits_at_k[k] += 1

    # Binary F1
    pred1 = [1 if all_sentences[i] == rec["positive"] else 0 for i in valid]
    f1_gold.append([1] + [0]*(len(pred1) -1))
    f1_pred.append(pred1)

# Aggregate metrics
n = len(test_records)
recall = {k: hits_at_k[k] / n for k in k_list}

# macro f1
f1_scores = [
    f1_score(g, p, zero_division=0) for g, p in zip(f1_gold, f1_pred)
]
f1_macro = {k: np.mean([f1_scores[i] for i in range(n)]) for k in k_list}

print("Recall:", recall)
print("Macro F1:", f1_macro)

In [None]:
import sys
import os

module_directory = os.path.abspath('code/')

# Add the directory to the Python path
sys.path.append(module_directory)

from inference import model_fn, transform_fn
m = model_fn("model")
body = json.dumps({"inputs": "A quick test sentence."})
out, ctype = transform_fn(m, body, "application/json", "application/json")
print(ctype, out[:80], "...")

In [85]:

import json, math, sys, re
from typing import Any, Iterable, Tuple, Optional

# --- core checks ---
CONTROL_CHARS_RE = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F]")  # raw control chars (must be escaped in JSON text)

def _check_numbers(x: Any, path: str="$") -> Optional[Tuple[str, float]]:
    """Return (path, value) if a non-finite number is found anywhere."""
    if isinstance(x, float):
        if not math.isfinite(x):
            return (path, x)
    elif isinstance(x, (list, tuple)):
        for i, v in enumerate(x):
            hit = _check_numbers(v, f"{path}[{i}]")
            if hit: return hit
    elif isinstance(x, dict):
        for k, v in x.items():
            hit = _check_numbers(v, f"{path}.{k}")
            if hit: return hit
    return None

def _check_schema(obj: Any) -> str:
    """Return an error string or '' if OK for schema:
       {"doc_id": str, "sent_id": int, "inputs": str}"""
    if not isinstance(obj, dict):
        return "record is not a JSON object"
    for k in ("docid", "sentid", "inputs"):
        if k not in obj:
            return f"missing required key: {k}"
    if not isinstance(obj["docid"], str) or not obj["docid"]:
        return "docid must be a non-empty string"
    if not isinstance(obj["sentid"], int):
        return "sentid must be an integer"
    if not isinstance(obj["inputs"], str) or not obj["inputs"]:
        return "inputs must be a non-empty string"
    if CONTROL_CHARS_RE.search(obj["docid"]):
        return "docid contains raw control characters"
    # (inputs may legitimately contain \n, \t etc as escaped sequences in JSON; the decoder handles that)
    return ""

def validate_jsonl_lines(lines: Iterable[bytes]) -> None:
    """Validate a JSONL stream (bytes per line). Prints first problem found and exits(1); otherwise prints OK."""
    # Detect BOM on very first bytes
    first = True
    for i, raw in enumerate(lines, 1):
        if not raw:
            # skip blank lines; JSON Lines allows them but many pipelines don’t expect them
            continue
        if first:
            first = False
            if raw.startswith(b"\xef\xbb\xbf"):  # UTF-8 BOM
                print(f"Line {i}: UTF-8 BOM detected; prefer UTF-8 without BOM.", file=sys.stderr)

        # Strict UTF-8; fail fast on decoding errors
        try:
            s = raw.decode("utf-8", "strict")
        except UnicodeDecodeError as e:
            print(f"Line {i}: UTF-8 decode error: {e}", file=sys.stderr)
            sys.exit(1)

        # Must be a single JSON value per line (no trailing commas etc.)
        try:
            rec = json.loads(s)
        except json.JSONDecodeError as e:
            head = s[:120].replace("\n", "\\n")
            print(f"Line {i}: invalid JSON ({e.msg}) at pos {e.pos}. Head: {head!r}", file=sys.stderr)
            sys.exit(1)

        # Schema & numeric sanity
        err = _check_schema(rec)
        if err:
            print(f"Line {i}: {err}", file=sys.stderr)
            sys.exit(1)

        hit = _check_numbers(rec)
        if hit:
            path, val = hit
            print(f"Line {i}: non-finite number at {path}: {val!r} (JSON forbids NaN/Infinity).", file=sys.stderr)
            sys.exit(1)

    print("Input looks good: UTF-8 OK, valid JSON Lines, schema OK, no NaN/Infinity found.")

# --- usage examples ---

# 1) Local file:
# with open("your_input.jsonl", "rb") as f:
#     validate_jsonl_lines(f)

# 2) S3 object:
# import boto3
# s3 = boto3.client("s3")
# obj = s3.get_object(Bucket="your-bucket", Key="path/to/input.jsonl")
# validate_jsonl_lines(obj["Body"].iter_lines())


import boto3, json

key = f"{prefix}/batch-output/{run_id}.jsonl"

s3 = boto3.client("s3")
obj = s3.get_object(Bucket=bucket, Key=key)
validate_jsonl_lines(obj["Body"].iter_lines())



Input looks good: UTF-8 OK, valid JSON Lines, schema OK, no NaN/Infinity found.
