# Searching for articles in WikiData

In [None]:
import os
os.chdir('../../')
print(os.getcwd())

import requests
import regex as re
import numpy as np
import json
import torch
from tqdm.notebook import tqdm
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from utils import json_helpers as jh
from utils.paths import *

import concurrent.futures

WIKIDATA_URL = "https://www.wikidata.org/w/api.php"
WIKIPEDIA_URL = "https://sv.wikipedia.org/w/api.php"
model = SentenceTransformer('KBLab/sentence-bert-swedish-cased', device="cpu")
session = requests.Session()

SEARCH_LIMIT = 5
MATCH_THRESHOLD = 0.6

In [None]:
# Get entries from json file
def read_items(filename: str) -> list[dict]:
    with open(f"{filename}.json", 'r', encoding='utf-8') as infile:
        items = json.loads(infile.read())
    return items

# Write entries to json file
def write_items(items: list[dict], filename: str) -> None:
    with open(f"{filename}.json", 'a', encoding='utf-8') as outfile:
        json.dump(items, outfile, ensure_ascii=False, indent=4)

In [None]:
def search_wikidata(query: str, limit: int):
    params = {
        "action": "wbsearchentities",
        "format": "json",
        "language": "sv",  # Language dictating how searches are made
        "uselang": "sv",   # Language of item description
        "limit": limit,       # Number of search results
        "search": query,
    }
    response = session.get(WIKIDATA_URL, params=params)
    if 'search' not in response.json().keys():
        return None
    return response.json()['search']

def get_wikipedia_title_from_qid(qid: str):
    params = {
        "action": "wbgetentities",
        "format": "json",
        # "lang": "sv",  # Language dictating how searches are made
        "props": "sitelinks",
        "ids": qid,
        "sitefilter": "svwiki",
        "languages": "se",
        # "uselang": "sv",   # Language of item description
        # "limit": 10,       # Number of search results
        # "search": query,
    }
    response = session.get(WIKIDATA_URL, params=params)
    entity_data = response.json()['entities'].get(qid)

    if not entity_data:
        return None

    wikipedia_dict = entity_data['sitelinks'].get('svwiki')

    if not wikipedia_dict:
        return None

    return wikipedia_dict['title']
    #wikipedia_page_url = f"https://sv.wikipedia.org/wiki/{wikipedia_dict['title'].replace(' ', '_')}"
    #return wikipedia_page_url

def get_first_paragraph_text_wikipedia(qid: str):
    # Extract the page title from the Wikipedia link
    page_title = get_wikipedia_title_from_qid(qid) # TITLE

    # Step 1: Get the Wikipedia page content
    page_params = {
        "action": "parse",
        "format": "json",
        "page": page_title,
        "prop": "text",
    }
    page_response = session.get(WIKIPEDIA_URL, params=page_params)
    page_data = page_response.json()

    if 'error' in page_data:
        return None

    # Step 2: Extract the text of the first paragraph
    page_text = page_data['parse']['text']['*']

    # Use regex to find the first paragraph within the HTML content
    first_paragraph_match = re.search(r'<p>(.*?)</p>', page_text, re.DOTALL)

    if first_paragraph_match:
        # Remove HTML tags from the paragraph
        first_paragraph = re.sub(r'<.*?>', '', first_paragraph_match.group(1))
        first_paragraph = re.sub(r'\[.*?\]', '', first_paragraph)
        return first_paragraph.strip()
    else:
        return None

def search_property(qid: str, prop: str ='P625'):
    params = {
        "action": "wbgetentities",
        "format": "json",
        "languages": "se",
        "ids": qid,
        "props": "claims",
    }
    response = session.get(WIKIDATA_URL, params=params)
    data = response.json()['entities'].get(qid)
    if not data:
        return None

    prop_claim = data['claims'].get(prop)
    if not prop_claim:
        return None

    prop_value = prop_claim[0].get('mainsnak', {}).get('datavalue', {}).get('value', None)
    return prop_value

def get_qid(entity):
    return entity['id']

def get_description(entity):
    return entity.get('display', {}).get('description', {}).get('value', '')


In [None]:
search_wikidata("Glenalmond", 5)

In [None]:
def get_all_wikipedia_titles_from_qids(qids: list):
    params = {
        "action": "wbgetentities",
        "format": "json",
        "props": "sitelinks",
        "ids": "|".join(qids),
        "sitefilter": "svwiki",
        "languages": "se",
    }
    response = session.get(WIKIDATA_URL, params=params)
    entities_data = response.json().get('entities', {})

    titles = {}
    for qid, entity_data in entities_data.items():
        wikipedia_dict = entity_data['sitelinks'].get('svwiki')
        if wikipedia_dict:
            titles[qid] = wikipedia_dict['title']
    return titles

def get_first_paragraph_text_from_title(page_title: str):
    page_params = {
        "action": "parse",
        "format": "json",
        "page": page_title,
        "prop": "text",
    }
    page_response = session.get(WIKIPEDIA_URL, params=page_params)
    page_data = page_response.json()

    if 'error' in page_data:
        return None

    page_text = page_data['parse']['text']['*']
    first_paragraph_match = re.search(r'<p>(.*?)</p>', page_text, re.DOTALL)

    if first_paragraph_match:
        first_paragraph = re.sub(r'<.*?>', '', first_paragraph_match.group(1))
        first_paragraph = re.sub(r'\[.*?\]', '', first_paragraph)
        return first_paragraph.strip()
    else:
        return None

def get_all_first_paragraph_text_wikipedia(qids: list):
    page_titles = get_all_wikipedia_titles_from_qids(qids)

    results = {}
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_qid = {executor.submit(get_first_paragraph_text_from_title, title): qid for qid, title in page_titles.items()}
        for future in concurrent.futures.as_completed(future_to_qid):
            qid = future_to_qid[future]
            try:
                results[qid] = future.result()
            except Exception as e:
                results[qid] = None
    return results

In [None]:
def process_qid(qid, descriptions_texts, wikipedia_texts):
    description = descriptions_texts.get(qid, None)
    wikipedia = wikipedia_texts.get(qid, None)

    if wikipedia:
        return model.encode(wikipedia[:200]).tolist()
    elif description:
        return model.encode(description[:200]).tolist()
    else:
        return [0] * 768

def concurrent_vector_processing(qids, descriptions_texts, wikipedia_texts):
    vectors = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_qid = {executor.submit(process_qid, qid, descriptions_texts, wikipedia_texts): qid for qid in qids}
        for future in concurrent.futures.as_completed(future_to_qid):
            vectors.append(future.result())
    return vectors

## Compare with wikidata description for comparison of cosine similarity

In [None]:
# take the description of each item, compute embedding with kb-sbert
# where should we take the data from? qdrant or json or what
def search(edition_file: str,  end_item: int, outfile: str, search_limit: int, match_threshold: float, start_item: int=0):
    items = jh.read_items(edition_file)
    end_item = min(len(items), end_item)
    nr_items = end_item - start_item
    items = items[start_item: end_item]

    batch_size = 1000
    run_batches = 1 + (nr_items // batch_size)

    last_item_written = start_item
    for i in tqdm(list(range(run_batches))):
      start = i * batch_size
      end = min(start + batch_size, nr_items)
      for e in items[start: end]:
          if e['class'] == 0 or e['cross_ref_key'] != "": # Ignore cross references
              continue
          search_term = e['headword']
          result = search_wikidata(search_term, search_limit)
          if not result :
              continue
          vectors = []

          qids = [get_qid(item) for item in result]
          descriptions_texts = {get_qid(item): get_description(item) for item in result}
          wikipedia_texts = get_all_first_paragraph_text_wikipedia(qids)

          vectors = concurrent_vector_processing(qids, descriptions_texts, wikipedia_texts)

          example_vector = model.encode(e['text'])
          scores = cosine_similarity([example_vector], vectors)[0]

          if max(scores) > match_threshold:
              best_match_index = list(scores).index(max(scores))
              qid = get_qid(result[best_match_index])
              coords = search_property(qid)
              if coords != None:
                  e['latitude'] = coords['latitude']
                  e['longitude'] = coords['longitude']

              e['qid'] = qid

          # for item in result:
          #     qid = get_qid(item)
          #     description = get_description(item)
          #     wikipedia_text = get_first_paragraph_text_wikipedia(qid)

          #     if wikipedia_text:
          #         vectors.append(model.encode(wikipedia_text[:200]).tolist())
          #     elif description:
          #         vectors.append(model.encode(description[:200]).tolist())
          #     else:
          #         vectors.append([0] * 768)

          # example_vector = model.encode(e['text'])
          # scores = cosine_similarity([example_vector], vectors)[0]

          # if max(scores) > match_threshold:
          #     best_match_index = list(scores).index(max(scores))
          #     qid = get_qid(result[best_match_index])
          #     coords = search_property(qid)
          #     if coords != None:
          #         e['latitude'] = coords['latitude']
          #         e['longitude'] = coords['longitude']

          #     e['qid'] = qid

      jh.write_items(items[start: end], outfile)
      last_item_written += batch_size

      print(f"Last index of item: {last_item_written - 1}")

    # results

    # return the one with the highest cosine sim
    # if it has P625 property (coordinate location), edit something
    # edit json or smth


### Try wiki searcher against test data to decide match threshold

In [None]:
def valid_location(entry: dict) -> bool:
    lat = entry['correct_lat']
    lon = entry['correct_lon']
    pred_lat = entry['latitude']
    pred_lon = entry['longitude']

    if lat == None and lon == None and pred_lat == None and pred_lon == None:
        return True
    elif lat != None and lon != None and pred_lat != None and pred_lon != None:
        lat_dif = abs(lat - pred_lat)
        lon_dif = abs(lon - pred_lon)
        if lat_dif < 0.5 and lon_dif < 0.5: # Number chosen intuitively
            return True
        else:
            return False
    else:
        return False

def eval_wiki_searcher(testfile: str, edition_nbr: int, match_threshold: float):
    search(edition_file=testfile, outfile=testfile, search_limit=5, match_threshold=match_threshold, end_item=1000)

    e = read_items(testfile)
    nr_entries = len(e)
    nr_correct = 0
    for entry in e:
        if valid_location(entry):
            nr_correct += 1

    accuracy = nr_correct / nr_entries
    with open(f'{WIKI_STATS_FOLDER}/e{edition_nbr}_stats_wiki.txt', 'a', encoding='utf-8') as file:
        file.write(f'Match threshold: {match_threshold}\n')
        file.write(f'Accuracy score: {accuracy}\n')
        file.write(f'------------\n')


In [None]:
# Tests. We found 0.6 to work best for both.
# A high threshold is prefereable as after location classifier,
# there may be non locations classified as locations.
# MATCH_THRESHOLD = 0.6
eval_wiki_searcher(f'{WIKI_TEST_FOLDER}/e1_test_wiki', 1, MATCH_THRESHOLD)
eval_wiki_searcher(f'{WIKI_TEST_FOLDER}/e2_test_wiki', 2, MATCH_THRESHOLD)

### Link editions with wikidata.

In [None]:
#e2_file = '/content/drive/MyDrive/Colab Notebooks/EDAN70/e2'
#from google.colab import drive
#drive.mount('/content/drive')
e1_file = f'C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/encyclopedias_jsons/e1'
outfile = 'C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/e1_80_000'

In [None]:
len(read_items(e1_file))

In [None]:
#outfile = '/content/drive/MyDrive/Colab Notebooks/EDAN70/e2_p1'
# Last checkpoint at 11000
search(edition_file=e1_file, end_item=180_000, outfile=outfile, search_limit=SEARCH_LIMIT, match_threshold=MATCH_THRESHOLD, start_item=80_000)

In [None]:
import re

# Read the input JSON file
with open('C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/e1_80_000.json', 'r', encoding='utf-8') as file:
    data = file.read()

# Replace '][' with '},{'
fixed_data = re.sub(r'\n\]\[', ',', data)

# Ensure the fixed data is wrapped in brackets to form a valid JSON array
fixed_data = f'[{fixed_data}]'

# Save the fixed JSON data to a new file
with open('C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/fixed_p2.json', 'w', encoding='utf-8') as file:
    file.write(fixed_data)

print("JSON file has been fixed and saved as 'C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/fixed_p2.json'")

In [None]:
e1_p1 = read_items('C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/e1_out')
e1_p2 = read_items('C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/fixed_e1_p2')

e1_full = e1_p1 + e1_p2
len(e1_full)


In [None]:
import os
os.chdir('../../')
print(os.getcwd())
from utils import json_helpers as jh
jh.write_items(e1_full, 'C:/Users/Henrik/Desktop/edan70/nordisk-familjebok/fixed_e1')

## Test single entry on wikidata

In [None]:
def test_single_entry(headword: str, text: str):
    result = search_wikidata(headword, 5)
    qids = [get_qid(item) for item in result]
    descriptions_texts = {get_qid(item): get_description(item) for item in result}
    wikipedia_texts = get_all_first_paragraph_text_wikipedia(qids)

    vectors = concurrent_vector_processing(qids, descriptions_texts, wikipedia_texts)

    example_vector = model.encode(text)
    scores = cosine_similarity([example_vector], vectors)[0]
    return qids, wikipedia_texts, scores

In [None]:
qids, texts, scores = test_single_entry('Paris', "<b>Paris</b> [franskt utt. pari], Frankrikes hufvudstad, näst London Europas folkrikaste stad, ligger under 2° 20' 15\" ö. lgd samt 48° 50' 11,2\" n. br. (nationalobservatoriet), på båda sidor om Seine,")
texts, qids, scores