# Skript to build the SOLR Core for cm_entities with details

In [1]:
import os, json, pprint, csv, pickle, pysolr, time, sys, requests
from typing import List, Dict, Any

In [2]:
root_path = "/data/scripts/cm/scripts/"
jl_output_path = "/data/scripts/jl/data/"
SOLR_URL = "http://localhost:8983/solr"
CORE_NAME = "cm_entities_beta"
CONFIG_SET = "_default"
CHUNK_SIZE = 200

In [3]:
# 1 entity name-uri mapping
name_to_uri = json.load(open(root_path + 'cooccurrence/output/name_to_uri.json', 'r', encoding="utf-8"))
uri_to_name = json.load(open(root_path + 'cooccurrence/output/uri_to_name.json', 'r', encoding="utf-8"))
# 2 entity pages inverted index
ep_inv_index = pickle.load(open(jl_output_path + 'entity_pages/ep_inv_index.pickle', 'rb'))
# 3 occurrence by journal
occurrence_by_journal = pickle.load(open(root_path + 'cooccurrence/output/occ_by_journal_detail.pickle', 'rb'))
# 4 related entities
entity_correlation = pickle.load(open(root_path + 'cooccurrence/output/entity_correlation.pickle', 'rb'))
# 5 data types 
classified_entities = json.load(open(root_path + 'jl_linking/classified_entities.json', 'r', encoding="utf-8"))
# 6 TAGME mentions
#cm_entities = pickle.load(open(root_path + 'cooccurrence/cm_tagme_resource_reference_data_05_03.pickle', 'rb'))

In [4]:
data = []
i,j,chunk = 0,0,0
for name, uri in name_to_uri.items():
    i += 1
    j += 1
    occs_by_journal = []
    for journal_info, mentions in occurrence_by_journal[uri].items():
        journal_mentions = {'j_name': journal_info[1],
                                'j_id': journal_info[0],
                                'first': mentions['first'],
                                'last': mentions['last'],
                               'mentions': mentions['data']
                               }
        occs_by_journal.append(journal_mentions)


    occs_by_journal = sorted(occs_by_journal, key=lambda x:len(x['mentions']), reverse=True)
    related_entities = sorted([[res_uri, uri_to_name[res_uri], score] for res_uri, score in entity_correlation[uri].items()], key=lambda x:x[2], reverse=True)

    # add entity type information to related entities
    for rel_ent in related_entities:
        ent_type = classified_entities[rel_ent[1].replace(' ', '_')]
        rel_ent.append(ent_type)


    label_name = name.replace(' ', '_')
    e_type = classified_entities[label_name]
    if len(e_type) == 0:
        e_type = "OTH"


    header = {'_id': str(j), '_index': 'cm_entities'}
    body = {'name': name,
            'e_type': e_type,
            'journal_occs': occs_by_journal,
            'related_entities': related_entities,
            'ep': ''
           }
    # to be fixed: not all entity pages exist!
    if uri in ep_inv_index:
        body['ep'] = ep_inv_index[uri]


    data.append(json.dumps({"index": header}, ensure_ascii=False))
    data.append(json.dumps(body, ensure_ascii=False))

    if i == 200:
        print(sys.getsizeof(data))
        with open('/data/cm/output/temp_journals_detail/chunk_'+str(chunk)+'.json', "w", encoding="utf-8") as write_file:
            write_file.write("\n".join(data))
        data = []
        i = 0
        chunk += 1
        print("chunk", chunk, end=" ")

    
with open('/data/cm/output/temp_journals_detail/chunk_'+str(chunk)+'.json', "w", encoding="utf-8") as write_file:
    write_file.write("\n".join(data))
print(i)

3256
chunk 1 3256
chunk 2 3256
chunk 3 3256
chunk 4 3256
chunk 5 3256
chunk 6 3256
chunk 7 3256
chunk 8 3256
chunk 9 3256
chunk 10 3256
chunk 11 3256
chunk 12 3256
chunk 13 3256
chunk 14 3256
chunk 15 3256
chunk 16 3256
chunk 17 3256
chunk 18 3256
chunk 19 3256
chunk 20 3256
chunk 21 3256
chunk 22 3256
chunk 23 3256
chunk 24 3256
chunk 25 3256
chunk 26 3256
chunk 27 3256
chunk 28 3256
chunk 29 3256
chunk 30 54


In [5]:
#define mappings
mappings = {
    "properties": {
        'name': {'type': 'text'},
        'ep': {'type': 'text'},
        'e_type': {'type': 'text'},
        'journal_occs': {'type': 'nested',
                        'properties': {
                            'j_name': {'type': 'text'},
                            'j_id': {'type': 'text'},
                            'first': {'type': 'integer'},
                            'last': {'type': 'integer'},
                            'mentions': {
                                'type': 'nested',
                                'properties': {
                                    'p_id': {'type': 'text'},
                                    'spot': {'type': 'text'},
                                    'start': {'type': 'integer'},
                                    'end': {'type': 'integer'},
                                    'p_link': {'type': 'text'},
                                    'date': {'type': 'text'}
                                } 
                            }
                        }
                        },
        'related_entities': {'type': 'text'}
    }
}

# Recreate the SOLR Core

In [7]:
# --- Configuration ---

def solr_core_exists(solr_url: str, core_name: str) -> bool:
    """
    Check if a given Solr core exists using the Core Admin STATUS action.
    """
    status_url = f"{solr_url}/admin/cores"
    params = {
        "action": "STATUS",
        "core": core_name,
        "wt": "json",
    }

    response = requests.get(status_url, params=params)
    response.raise_for_status()
    data = response.json()

    # If the core exists, it should appear in the "status" dict with some content
    status = data.get("status", {})
    return core_name in status and bool(status[core_name])


def solr_delete_core(solr_url: str, core_name: str) -> None:
    """
    Delete (unload) a Solr core and remove its index, data and instance directory.
    """
    unload_url = f"{solr_url}/admin/cores"
    params = {
        "action": "UNLOAD",
        "core": core_name,
        "deleteIndex": "true",
        "deleteDataDir": "true",
        "deleteInstanceDir": "true",
        "wt": "json",
    }

    print(f"Unloading and deleting core '{core_name}' …")
    response = requests.get(unload_url, params=params)
    response.raise_for_status()
    print(f"Core '{core_name}' successfully unloaded and deleted.")
    print(response.json())


def solr_create_core(solr_url: str, core_name: str, config_set: str) -> None:
    """
    Create a Solr core using a given configSet.
    """
    create_url = f"{solr_url}/admin/cores"
    params = {
        "action": "CREATE",
        "name": core_name,
        "configSet": config_set,
        "wt": "json",
    }

    print(f"Creating core '{core_name}' with configSet '{config_set}' …")
    response = requests.get(create_url, params=params)
    response.raise_for_status()
    print(f"Core '{core_name}' successfully created.")
    print(response.json())


def recreate_solr_core(solr_url: str, core_name: str, config_set: str) -> None:
    """
    Check if the core exists, delete it if necessary, and create it again.
    """
    try:
        if solr_core_exists(solr_url, core_name):
            print(f"Core '{core_name}' already exists.")
            solr_delete_core(solr_url, core_name)
            # Small delay to give Solr time to clean up
            time.sleep(2)
        else:
            print(f"Core '{core_name}' does not exist yet.")

        solr_create_core(solr_url, core_name, config_set)
    except requests.RequestException as e:
        print("Error while communicating with Solr:")
        print(e)

In [8]:
recreate_solr_core(SOLR_URL, CORE_NAME, CONFIG_SET)

Core 'cm_entities_beta' already exists.
Unloading and deleting core 'cm_entities_beta' …
Core 'cm_entities_beta' successfully unloaded and deleted.
{'responseHeader': {'status': 0, 'QTime': 179}}
Creating core 'cm_entities_beta' with configSet '_default' …
Core 'cm_entities_beta' successfully created.
{'responseHeader': {'status': 0, 'QTime': 231}, 'core': 'cm_entities_beta'}


In [9]:
print(data[:5])

['{"index": {"_id": "6001", "_index": "cm_entities"}}', '{"name": "Sigtuna", "e_type": "LOC", "journal_occs": [{"j_name": "Israelitische Wochenschrift für die religiösen und socialen Interessen des Judenthums", "j_id": "5369809", "first": 1877, "last": 1877, "mentions": [{"p_id": "9583660", "spot": "Sigtuna", "start": 3413, "end": 3420, "p_link": "9583660", "date": "1877-08-15", "year": 1877}]}], "related_entities": [["http://data.judaicalink.org/data/dbpedia/Stockholm", "Stockholm", 0.00015971889474524837, "LOC"]], "ep": "http://data.judaicalink.org/data/ep/1924433"}', '{"index": {"_id": "6002", "_index": "cm_entities"}}', '{"name": "Victor Hollaender", "e_type": "PER", "journal_occs": [{"j_name": "Israelitische Wochenschrift für die religiösen und socialen Interessen des Judenthums", "j_id": "5369809", "first": 1905, "last": 1905, "mentions": [{"p_id": "9607067", "spot": "Victor Hollaender", "start": 7630, "end": 7647, "p_link": "9607067", "date": "1905-11-24", "year": 1905}]}], "rel

In [10]:
def build_nested_docs_from_bulk_lines(lines: List[str]) -> List[Dict[str, Any]]:
    """
    Transform Elasticsearch bulk-like NDJSON into Solr nested documents
    with three levels:
      - parent: entity
      - child: journal occurrence
      - grandchild: mentions within a journal
      - sibling children: related entities

    Expected input (alternating lines):
      {"index": {"_id": "0", "_index": "cm_entities"}}
      {
        "name": "...",
        "e_type": "...",
        "journal_occs": [
          {
            "j_name": "...",
            "j_id": "...",
            "first": 123,
            "last": 456,
            "mentions": [
              {
                "p_id": "...",
                "spot": "...",
                "start": 10,
                "end": 20,
                "p_link": "...",
                "date": "YYYY-MM-DD"
              },
              ...
            ]
          },
          ...
        ],
        "related_entities": [
           [rel_uri, rel_name, score, ent_type],
           ...
        ],
        "ep": "..."
      }
    """
    docs: List[Dict[str, Any]] = []
    current_id: str | None = None

    for line in lines:
        line = line.strip()
        if not line:
            continue

        obj = json.loads(line)

        # Meta line from Elasticsearch bulk
        if "index" in obj:
            current_id = str(obj["index"]["_id"])
            continue

        if current_id is None:
            # No index line before – skip or raise, here we skip
            continue

        body = obj
        parent_id = current_id  # you can change this to body["ep"] or similar if you prefer

        # --- Parent document (entity) ---
        parent_doc: Dict[str, Any] = {
            "id": parent_id,
            "name": body.get("name"),
            "e_type": body.get("e_type"),
            "ep": body.get("ep"),
        }

        child_docs: List[Dict[str, Any]] = []

        # --- Journal occurrences (children) ---
        for j_idx, journal_occ in enumerate(body.get("journal_occs", [])):
            j_name = journal_occ.get("j_name")
            j_id = journal_occ.get("j_id")
            first = journal_occ.get("first")
            last = journal_occ.get("last")
            mentions = journal_occ.get("mentions", [])

            # Grandchildren: individual mentions in this journal
            mention_children: List[Dict[str, Any]] = []
            for m_idx, mention in enumerate(mentions):
                mention_children.append(
                    {
                        # every child doc needs an id because Solr's uniqueKey is required
                        "id": f"{parent_id}_j_{j_idx}_m_{m_idx}",
                        "type": "mention",
                        "p_id": mention.get("p_id"),
                        "spot": mention.get("spot"),
                        "start": int(mention.get("start")) if mention.get("start") is not None else None,
                        "end": int(mention.get("end")) if mention.get("end") is not None else None,
                        "p_link": mention.get("p_link"),
                        "date": mention.get("date"),
                    }
                )

            journal_doc: Dict[str, Any] = {
                "id": f"{parent_id}_j_{j_idx}",
                "type": "journal_occurrence",
                "j_name": j_name,
                "j_id": j_id,
                "first": int(first) if first is not None else None,
                "last": int(last) if last is not None else None,
            }

            if mention_children:
                journal_doc["_childDocuments_"] = mention_children

            child_docs.append(journal_doc)

        # --- Related entities (children on same level as journals) ---
        for r_idx, rel in enumerate(body.get("related_entities", [])):
            # rel = [rel_uri, rel_name, score, ent_type]
            if len(rel) < 4:
                # safety: skip malformed entries
                continue
            rel_uri, rel_name, score, rel_type = rel
            child_docs.append(
                {
                    "id": f"{parent_id}_r_{r_idx}",
                    "type": "related_entity",
                    "rel_uri": rel_uri,
                    "rel_name": rel_name,
                    "rel_score": float(score),
                    "rel_e_type": rel_type,
                }
            )

        if child_docs:
            parent_doc["_childDocuments_"] = child_docs

        docs.append(parent_doc)

    return docs

# Indexing data

In [11]:
def iter_chunks(items, size: int):
    """Yield successive chunks of `size` from list `items`."""
    for i in range(0, len(items), size):
        yield items[i : i + size]


def upload_nested_docs_to_solr(
    docs: List[Dict[str, Any]],
    solr_url: str = SOLR_URL,
    core_name: str = CORE_NAME,
    chunk_size: int = CHUNK_SIZE,
):
    """
    Upload nested documents (parent + children) to Solr in chunks using pysolr.
    """
    solr = pysolr.Solr(f"{solr_url}/{core_name}", timeout=200)

    total = len(docs)
    print(f"Indexing {total} parent docs (with children) into '{core_name}' …")

    start = time.time()
    for idx, chunk in enumerate(iter_chunks(docs, chunk_size), start=1):
        print(f"  Sending chunk {idx} ({len(chunk)} parent docs)…")
        solr.add(chunk, commit=False)

    solr.commit()
    end = time.time()
    print(f"Finished indexing in {end - start:.2f} seconds.")


In [None]:
nested_docs = build_nested_docs_from_bulk_lines(data)
upload_nested_docs_to_solr(nested_docs)

# Load from chunk data

In [12]:
import glob
import os

chunk_dir = "/data/cm/output/temp_journals_detail"

# optional sort for reproducible order
chunk_files = sorted(glob.glob(os.path.join(chunk_dir, "chunk_*.json")))

for fname in chunk_files:
    print(f"Processing {fname} …")
    with open(fname, "r", encoding="utf-8") as f:
        lines = [line.rstrip("\n") for line in f]

    nested_docs = build_nested_docs_from_bulk_lines(lines)
    upload_nested_docs_to_solr(nested_docs)


Processing /data/cm/output/temp_journals_detail/chunk_0.json …
Indexing 200 parent docs (with children) into 'cm_entities_beta' …
  Sending chunk 1 (200 parent docs)…
Finished indexing in 29.12 seconds.
Processing /data/cm/output/temp_journals_detail/chunk_1.json …
Indexing 200 parent docs (with children) into 'cm_entities_beta' …
  Sending chunk 1 (200 parent docs)…
Finished indexing in 6.06 seconds.
Processing /data/cm/output/temp_journals_detail/chunk_10.json …
Indexing 200 parent docs (with children) into 'cm_entities_beta' …
  Sending chunk 1 (200 parent docs)…
Finished indexing in 1.31 seconds.
Processing /data/cm/output/temp_journals_detail/chunk_11.json …
Indexing 200 parent docs (with children) into 'cm_entities_beta' …
  Sending chunk 1 (200 parent docs)…
Finished indexing in 1.17 seconds.
Processing /data/cm/output/temp_journals_detail/chunk_12.json …
Indexing 200 parent docs (with children) into 'cm_entities_beta' …
  Sending chunk 1 (200 parent docs)…
Finished indexing in 

# Test query

In [13]:
# test query
solr = pysolr.Solr(f"{SOLR_URL}/{CORE_NAME}", always_commit=False, timeout=10)
start = time.time()
res = solr.search('name:Israel', index=CORE_NAME, rows=10000)
end = time.time()
print("Search took", round(end-start, 2))

Search took 0.04


In [14]:
print(res.hits)

30


In [15]:
for doc in res:
    print(doc)
    #for el in doc['journal_occs']:
    #    print(el)

{'id': '18', 'name': ['Israel'], 'e_type': ['LOC'], 'ep': ['http://data.judaicalink.org/data/ep/1188573'], '_version_': 1850522079782240256}
{'id': '2166', 'name': ['Israel Friedlaender'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.org/data/ep/1265162'], '_version_': 1850522111931580416}
{'id': '2874', 'name': ['Israel Jacobson'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.org/data/ep/1005126'], '_version_': 1850522116434165760}
{'id': '2219', 'name': ['Israel Belkind'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.org/data/ep/1005119'], '_version_': 1850522113038876673}
{'id': '5466', 'name': ['Israel Gutman'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.org/data/ep/1005123'], '_version_': 1850522136200871940}
{'id': '5492', 'name': ['Israel Nadschara'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.org/data/ep/2598311'], '_version_': 1850522136206114821}
{'id': '5804', 'name': ['Israel Aksenfeld'], 'e_type': ['PER'], 'ep': ['http://data.judaicalink.o