# Importing modules

In [5]:
import random
import json
import re
from pathlib import Path
from datetime import datetime
from tqdm import tqdm
from collections import defaultdict

from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from typing import List, Dict

# Defining prompt templates

In [6]:
SYSTEM_PROMPT = SystemMessage(content="""
You are a dataset generator for named entity recognition (NER). Your task is to generate 5 unique, natural-sounding English sentences.
Return your output **ONLY** as a JSON array.

Requirements:
1. Each sentence may include one or more mountain names from the provided list.
2. Every mountain in the input list must appear **at least once** across the 5 sentences.
3. Wrap every mountain name exactly with <mon> and </mon> tags. Example:
   "The climbers scaled <mon>Mount Everest</mon> and rested near <mon>K2</mon>."
4. Do NOT include start or end character indices—only inline tags.
5. Use diverse forms of mountain names: "Mount X", "Mt. X", "X", "Monte X", or local variants.
6. Include sentences with punctuation, parentheses, lists ("... and ..."), abbreviations ("Mt."), and possessive forms ("K2's prominence").
7. Include some negative examples: sentences may contain the word "mountain" metaphorically or place names that are not mountains, but do NOT wrap them in <mon> tags.
8. Include a few sentences with **no mountain names at all**.

Example:
[
  {
    "text": "The climbers scaled Mount Everest and rested near K2.",
  },
  {
    "text": "After conquering Kangchenjunga, they turned their attention to Lhotse.",
  }
]
""")

HUMAN_PROMPT_TEMPLATE = PromptTemplate(
    input_variables=["mountains"],
    template="""
Generate 5 unique sentences using the following mountains: {mountains}.
"""
)

# Setting up LLM. Local model in this case

In [7]:
#from langchain_google_genai import ChatGoogleGenerativeAI
#llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.7)
llm = ChatOpenAI(
    base_url="http://127.0.0.1:8000/v1",
    api_key="none",
    model="local"
)

# Fetching mountain names from Wikipedia

In [8]:
import requests
import lxml
from bs4 import BeautifulSoup

URL = "https://en.wikipedia.org/wiki/List_of_mountains_by_elevation"
HEADERS = {
    "User-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
}

def fetch_mountains():
    mountains = []
    r = requests.get(URL, headers=HEADERS)
    html = BeautifulSoup(r.content, "lxml")
    tables = html.find_all("tbody")
    for table in tables:
        elems = table.find_all("tr")
        for elem in elems[1:]:
            elem = elem.find_all("td")
            link = elem[0].find("a")
            if link is None:
                name = elem[0].text.strip()
            else:
                name = link.get("title")
            mountains.append(name)

    return mountains

def fetch_mountains_by_region():
    mountains_by_region = defaultdict(list)
    r = requests.get(URL, headers=HEADERS)
    html = BeautifulSoup(r.content, "lxml")
    tables = html.find_all("tbody")
    for table in tables:
        elems = table.find_all("tr")
        for elem in elems[1:]:
            elem = elem.find_all("td")
            link = elem[0].find("a")
            if link is None:
                name = elem[0].text.strip()
            else:
                name = link.get("title")
            region = elem[-1].text.strip()
            mountains_by_region[region].append(name)

    return mountains_by_region

# Defining parse function

In [9]:
def parse(output: str):
    """
    Parse mountain tags from LLM response into a Dict

    Args:
        output (str): Raw response from LLM

    Returns:
        dict: Parsed output
    """
    # Parsing Json 
    #pattern = r"```(?:[a-zA-Z0-9_+-]+)?\n?(.*?)```"
    #match = re.search(pattern, output, flags=re.DOTALL)
    #content = json.loads(match.group(1))
    
    content = json.loads(output)
    result = []
    for elem in content:
        text = elem["text"]
        entities = []
        clean_text = ""
        last_index = 0

        # Parsing <mon></mon> tags and removing them
        pattern = r"<mon>(.*?)</mon>"
        for mon_match in re.finditer(pattern, text):
            start, end = mon_match.span()
            name = mon_match.group(1)

            # Adding text before tag
            clean_text += text[last_index:start]
            entity_start = len(clean_text)
            # Adding name
            clean_text += name
            entity_end = len(clean_text)
            
            entities.append({
                "start": entity_start,
                "end": entity_end,
                "label": "MOUNTAIN"
            })

            last_index = end

        # Adding the rest of the text
        clean_text += text[last_index:]
            
        result.append({
            "text": clean_text,
            "entities": entities
        })

    return result

# Dataset generation

- Takes random samples from mountains
- Gives to LLM
- Tries to parse it
- Move to dataset
- Dumps results to disk that we do not loose them if something breaks

In [10]:
mountains = fetch_mountains()

In [11]:
DUMP_PERIOD = 4 # LLM outputs in one dump
MOUNTAINS_BATCH_SIZE = 4 # batch size
DATASET_DIR = Path("dataset") # Dataset directory
DATASET_DIR.mkdir(parents=True, exist_ok=True)

# TOTAL_BATCHES = (len(mountains) + MOUNTAINS_BATCH_SIZE - 1) // MOUNTAINS_BATCH_SIZE

i = 0

dataset = []
with tqdm(total=len(mountains), initial=i, bar_format='{l_bar}{bar:40}{r_bar}') as pbar:
    while i < len(mountains):
        try:
            #start_idx = i * MOUNTAINS_BATCH_SIZE
            #end_idx = (i + 1) * MOUNTAINS_BATCH_SIZE
            sampled_mountains = mountains[i:i + MOUNTAINS_BATCH_SIZE]
    
            human_prompt = HUMAN_PROMPT_TEMPLATE.format(mountains=sampled_mountains)
            response = llm.invoke([
                SYSTEM_PROMPT,
                human_prompt
            ])
            
            try:
                dataset.extend(parse(response.content))
            except json.JSONDecodeError as e:
                tqdm.write(f"FAILED TO PARSE: {i}")
                continue
        
            if (len(dataset) >= MOUNTAINS_BATCH_SIZE * DUMP_PERIOD or i + len(sampled_mountains) >= len(mountains)):
                file_name = DATASET_DIR / f"ner_dataset_{datetime.now().strftime('%Y-%m-%d %H-%M-%S')}.json"
                with open(file_name, "w", encoding="utf-8") as f:
                    json.dump(dataset, f, indent=2)
                pbar.set_description(f"DUMP: {file_name}")
                dataset = []
            
            pbar.update(len(sampled_mountains))
            i += len(sampled_mountains)
        except KeyboardInterrupt:
            print("Stopping, saving current dataset")
            if dataset:
                file_name = DATASET_DIR / f"ner_dataset_{datetime.now().strftime('%Y-%m-%d %H-%M-%S')}.json"
                with open(file_name, "w", encoding="utf-8") as f:
                    json.dump(dataset, f, indent=2)
                print(f"Saved: {file_name}")
            
            break

  0%|                                        | 0/1651 [00:01<?, ?it/s]

Stopping, saving current dataset





# Merge all dumps into one

In [15]:
OUTPUT_PATH = Path("dataset.json")

result = []
for item in Path(DATASET_DIR).rglob("ner_dataset_*.json"):
    with open(item, "r", encoding="utf-8") as f:
        result.extend(json.load(f))

with open(OUTPUT_PATH, "w") as f:
    json.dump(result, f, indent=2)

# Defining auxiliary functions for dataset conversion

In [13]:
from transformers import AutoTokenizer
import os

LABEL_B = "B-MOUNTAIN"
LABEL_I = "I-MOUNTAIN"
LABEL_O = "O"

label2id = {LABEL_O: 0, LABEL_B: 1, LABEL_I: 2}


def load_json(path: Path) -> List[Dict]:
    text = path.read_text(encoding="utf-8").strip()
    if not text:
        raise ValueError(f"Input file {path} is empty.")
    # try JSON array
    data = json.loads(text)
    if isinstance(data, list):
        return data
    else:
        raise ValueError("JSON top-level is not a list.")

def convert_spans_to_token_labels(data: List[Dict], tokenizer, max_length: int):
    """
    Returns dict with keys:
      - input_ids, attention_mask, labels (list of ints same length as input_ids; -100 for ignored)
      - tokens (for inspection)
      - offsets (for inspection)
    """
    out = {
        "input_ids": [],
        "attention_mask": [],
        "labels": [],
        "tokens": [],
        "text": []
    }
    problems = []
    for idx, item in enumerate(data):
        text = item.get("text", "")
        ents = item.get("entities", [])
        # char labels: 0 = O, 1 = entity
        char_labels = [0] * len(text)
        for ent in ents:
            s, e = ent.get("start"), ent.get("end")
            if not (isinstance(s, int) and isinstance(e, int) and 0 <= s < e <= len(text)):
                problems.append((idx, "bad_entity_offsets", ent))
                continue
            for i in range(s, e):
                char_labels[i] = 1
        enc = tokenizer(
            text,
            return_offsets_mapping=True,
            truncation=True,
            max_length=max_length,
        )
        offsets = enc.pop("offset_mapping")
        tokens = tokenizer.convert_ids_to_tokens(enc["input_ids"])
        label_ids = []
        for (tok_start, tok_end), token in zip(offsets, tokens):
            # special tokens: some fast tokenizers encode them with (0,0) offsets
            if tok_start == tok_end == 0:
                label_ids.append(-100)
                continue
            # ensure offsets inside text bounds
            tok_start = min(tok_start, len(text))
            tok_end = min(tok_end, len(text))
            if tok_start >= tok_end:
                label_ids.append(-100)
                continue
            # determine overlap with char_labels
            inside = any(char_labels[tok_start:tok_end])
            if not inside:
                label_ids.append(label2id[LABEL_O])
                continue
            # is token the start of an entity? i.e., char at tok_start is entity and either at text start or previous char not entity
            is_start = False
            if tok_start < len(char_labels) and char_labels[tok_start] == 1:
                if tok_start == 0 or char_labels[tok_start - 1] == 0:
                    is_start = True
            # fallback: if no char at tok_start is labeled but earlier chars inside token are entity (rare), treat as I
            if is_start:
                label_ids.append(label2id[LABEL_B])
            else:
                label_ids.append(label2id[LABEL_I])
        out["input_ids"].append(enc["input_ids"])
        out["attention_mask"].append(enc["attention_mask"])
        out["labels"].append(label_ids)
        out["tokens"].append(tokens)
        out["text"].append(text)
    return out, problems

def write_conll(out_prefix: str, tokens_list: List[List[str]], labels_list: List[List[int]], tokenizer):
    path = Path(f"{out_prefix}.conll")
    with path.open("w", encoding="utf-8") as f:
        for tokens, labs in zip(tokens_list, labels_list):
            for tok, lab in zip(tokens, labs):
                if lab == -100:
                    continue
                # convert lab id -> tag
                if lab == label2id[LABEL_O]:
                    tag = "O"
                elif lab == label2id[LABEL_B]:
                    tag = LABEL_B
                elif lab == label2id[LABEL_I]:
                    tag = LABEL_I
                else:
                    tag = "O"
                f.write(f"{tok}\t{tag}\n")
            f.write("\n")
    return str(path)

def save_hf_dataset(out_prefix: str, tokenized: Dict):
    try:
        from datasets import Dataset, DatasetDict
    except Exception:
        print("`datasets` not installed; skipping saving Hugging Face dataset. Install with `pip install datasets` to enable.")
        return None
    ds = Dataset.from_dict({
        "input_ids": tokenized["input_ids"],
        "attention_mask": tokenized["attention_mask"],
        "labels": tokenized["labels"],
        "text": tokenized["text"]
    })
    # Save locally
    outdir = f"{out_prefix}.hf_dataset"
    ds.save_to_disk(outdir)
    return outdir

# Converting dataset

In [16]:
INPUT_JSON_PATH = "dataset.json"
OUTPUT_PATH = "mountains_prepared"
MODEL = "bert-base-cased"
MAX_LENGTH = 128
    
inp = Path(INPUT_JSON_PATH)
assert inp.exists(), f"Input {inp} not found."
data = load_json(inp)
print(f"Loaded {len(data)} examples from {inp}")

# tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL, use_fast=True)
tokenized, problems = convert_spans_to_token_labels(data, tokenizer, MAX_LENGTH)
print(f"Converted {len(tokenized['input_ids'])} examples to token labels. Problematic entities: {len(problems)}")
if problems:
    print("Sample problem:", problems[:3])

# write conll for inspection
conll_path = write_conll(OUTPUT_PATH, tokenized["tokens"], tokenized["labels"], tokenizer)
print(f"Wrote inspection CoNLL to: {conll_path}")

# save HF dataset if possible
outdir = save_hf_dataset(OUTPUT_PATH, tokenized)
if outdir:
    print(f"Hugging Face dataset saved to: {outdir}")

# print a few token/label samples
n_show = min(3, len(tokenized["tokens"]))
print("\n=== Example token/label outputs (first %d) ===" % n_show)
id2label = {v: k for k, v in label2id.items()}
for i in range(n_show):
    print(f"\nTEXT: {tokenized['text'][i]}")
    toks = tokenized["tokens"][i]
    labs = tokenized["labels"][i]
    pretty = []
    for t, l in zip(toks, labs):
        if l == -100:
            continue
        pretty.append(f"{t}:{id2label.get(l,'-')}")
    print(" ".join(pretty))

# print trainer-friendly mapping
print("\nLabel mapping for Trainer:")
id2label_trainer = {str(v): k for k, v in label2id.items()}
print("label2id:", label2id)
print("id2label:", id2label_trainer)
print("\nDone.")

Loaded 2491 examples from dataset.json




Converted 2491 examples to token labels. Problematic entities: 0
Wrote inspection CoNLL to: mountains_prepared.conll


Saving the dataset (1/1 shards): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2491/2491 [00:00<00:00, 401815.68 examples/s]

Hugging Face dataset saved to: mountains_prepared.hf_dataset

=== Example token/label outputs (first 3) ===

TEXT: After conquering Mount Eolus, the hikers decided to explore the nearby Windom Peak.
After:O conquer:O ##ing:O Mount:B-MOUNTAIN E:I-MOUNTAIN ##ol:I-MOUNTAIN ##us:I-MOUNTAIN ,:O the:O hike:O ##rs:O decided:O to:O explore:O the:O nearby:O Wind:B-MOUNTAIN ##om:I-MOUNTAIN Peak:I-MOUNTAIN .:O

TEXT: In Colorado, Mount Columbia (Colorado) is a popular destination for winter sports enthusiasts.
In:O Colorado:O ,:O Mount:B-MOUNTAIN Columbia:I-MOUNTAIN (:I-MOUNTAIN Colorado:I-MOUNTAIN ):I-MOUNTAIN is:O a:O popular:O destination:O for:O winter:O sports:O enthusiasts:O .:O

TEXT: Among the highest peaks in the world, Challenger Point stands out for its challenging ascent.
Among:O the:O highest:O peaks:O in:O the:O world:O ,:O Challenger:B-MOUNTAIN Point:I-MOUNTAIN stands:O out:O for:O its:O challenging:O ascent:O .:O

Label mapping for Trainer:
label2id: {'O': 0, 'B-MOUNTAIN': 1, 'I-M


