In [1]:
import tarfile
import io
import zipfile
import importlib
import regex as re
import pyperclip  
import TexSoup as TS
from TexSoup.tokens import MATH_ENV_NAMES
import os


In [2]:
import pandas as pd

# Load the datasets
scopus_data = pd.read_csv('data/2201.00_scopus_931.csv')
ror_mapping = pd.read_csv('matching_data/matched_results_fuzzy.csv')

# Merge the datasets on 'Primary Org Id'
merged_data = pd.merge(
    scopus_data,
    ror_mapping,
    on='Primary Org Id',
    how='left'  # Use 'left' to keep all rows from scopus_data even if no match is found
)

# Calculate the number of unmatched rows
unmatched_count = merged_data['ROR ID'].isna().sum()

print(f"Number of unmatched 'Primary Org Id' values: {unmatched_count}")

# Save the merged data to a new CSV file
merged_data.to_csv('data/2201.00_scopus_931_with_ror.csv', index=False)

print("Matching completed. The output is saved as 'data/2201.00_scopus_931_with_ror.csv'.")

import os
import pandas as pd

# Paths
csv_path = 'data/2201.00_scopus_931_with_ror.csv'
text_folder = 'data/2201_00_text'

# Read the CSV file
df = pd.read_csv(csv_path)

# Assuming the column containing ArXiv IDs is named 'ArXiv Id'
arxiv_ids = df['ArXiv Id'].dropna().unique()  # Drop NaN values and get unique IDs

# Initialize counters and a list to store missing files
found_count = 0
total_count = len(arxiv_ids)
missing_files = []

# Check for each ArXiv ID
for arxiv_id in arxiv_ids:
    txt_file_path = os.path.join(text_folder, f"{arxiv_id}.txt")
    if os.path.isfile(txt_file_path):
        found_count += 1
    else:
        missing_files.append(arxiv_id)

# Calculate the percentage
percentage = (found_count / total_count) * 100

# Display results
print(f"Total ArXiv IDs: {total_count}")
print(f"Found text files: {found_count}")
print(f"Percentage: {percentage:.2f}%")
print("\nMissing files:")
for missing_id in missing_files:
    print(missing_id)


import pandas as pd

# Load the CSV file
file_path = 'data/2201.00_scopus_931_with_ror.csv'
df = pd.read_csv(file_path)

# Select the desired columns
columns_to_keep = ['Primary Org Id', 'Primary Org Name_x', 'ArXiv Id', 'ROR ID']
df_selected = df[columns_to_keep]

# Save the preprocessed data to a new CSV file
output_file_path = 'data/preprocessed_scopus_data.csv'
df_selected.to_csv(output_file_path, index=False)

print(f"Preprocessed data saved to {output_file_path}")

Number of unmatched 'Primary Org Id' values: 24
Matching completed. The output is saved as 'data/2201.00_scopus_931_with_ror.csv'.
Total ArXiv IDs: 931
Found text files: 931
Percentage: 100.00%

Missing files:
Preprocessed data saved to data/preprocessed_scopus_data.csv


In [3]:
import pandas as pd
from extractors.trie_extractor import TrieExtractor
from utils.file_reader import read_file
from tqdm import tqdm  # Import tqdm for progress bar

# Load the CSV file
csv_path = "data/2201.00_scopus_931_with_ror.csv"
df = pd.read_csv(csv_path)

# Initialize the TrieExtractor
extractor = TrieExtractor(data_path="data/1.34_extracted_ror_data.csv", common_words_path="data/common_english_words.txt")

In [4]:
import zipfile
import re
import io

def find_doc_class(fp, name_match=False):
    """Search for document class related lines in a file and return a code to represent the type"""
    doc_class_pat = re.compile(r"^\s*\\document(?:style|class)")
    sub_doc_class = re.compile(r"^\s*\\document(?:style|class).*(?:\{standalone\}|\{subfiles\})")

    file_content = fp.read()
    try:
        file_text = file_content.decode('utf-8')
    except UnicodeDecodeError:
        file_text = file_content.decode('latin-1')

    for line in file_text.splitlines():
        if doc_class_pat.search(line):
            if name_match:
                if sub_doc_class.search(line):
                    return -99999
                return 1  # Found document class line
    return 0  # No document class line found

def find_main_tex_source_in_folder(zip_file, folder_name):
    """Find the main .tex file inside a folder within the zip archive"""
    tex_names = {"paper", "main", "ms.", "article"}
    
    # Get all .tex files in the folder
    tex_files = [f for f in zip_file.namelist() if f.startswith(folder_name + '/') and f.endswith('.tex')]

    if len(tex_files) == 1:
        return tex_files[0]  # If there's only one .tex file, return it

    main_files = {}
    for tex_file in tex_files:
        depth = tex_file.count('/') - folder_name.count('/')  # Depth relative to folder
        has_main_name = any(kw in tex_file for kw in tex_names)
        
        with zip_file.open(tex_file) as fp:
            main_files[tex_file] = find_doc_class(fp, name_match=has_main_name) - depth

    return max(main_files, key=main_files.get) if main_files else None

def pre_format(text):
    """Format LaTeX text by adding spaces where necessary"""
    # return text.replace('\\}\\', '\\} \\').replace(')}', ') }').replace(')$', ') $')
    return text

def source_from_zip(zip_file, folder_name):
    """Extract and decode the main .tex file from a folder inside the zip archive"""
    tex_main = find_main_tex_source_in_folder(zip_file, folder_name)
    if tex_main:
        with zip_file.open(tex_main) as fp:
            file_content = fp.read()
            try:
                source_text = pre_format(file_content.decode('utf-8'))
            except UnicodeDecodeError:
                source_text = pre_format(file_content.decode('latin-1'))
            return source_text
    return None

def extract_before_abstract(source_text):
    """Extract text before the abstract section"""
    no_comments_text = re.sub(r'(?<!\\)%.*', '', source_text)  # Remove comments
    # no_usepackage_text = re.sub(r'\\usepackage\s*\{[^}]+\}', '', no_comments_text)  # Remove usepackage
    # text = re.sub(r'\\[a-zA-Z]+\{[^}]*\}', '', no_usepackage_text)  # Remove LaTeX commands
    text = no_comments_text
    # text = re.sub(r'\\[a-zA-Z]+\[[^\]]*\]\{[^}]*\}', '', text)
    # text = re.sub(r'\$[^$]*\$', '', text)  # Remove inline math
    # text = text.replace('{', '').replace('}', '').replace('\n', ' ')
    text = ' '.join(text.split())

    # output_file.write(f"text: {text}\n")

    abstract_word_match = re.search(r'begin{abstract}', text, re.IGNORECASE)
    if abstract_word_match:
        return text[:abstract_word_match.start()].strip()

    abstract_match = re.search(r'abstract{', text)
    if abstract_match:
        return text[:abstract_match.start()].strip()
    
    nomacro_abstract_match_2 = re.search(r'abstract', text)
    if nomacro_abstract_match_2:
        return text[:nomacro_abstract_match_2.start()].strip()
    
    nomacro_abstract_match_1 = re.search(r'Abstract', text)
    if nomacro_abstract_match_1:
        return text[:nomacro_abstract_match_1.start()].strip()
    
    return None

zip_file_path = "./2311_tex_test.zip"
output_path = "./output.txt"


with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
    base_folder = zip_file.namelist()[0]
    folders = {name.rstrip('/') for name in zip_file.namelist() if name.startswith(base_folder) and name.endswith('/') and name != base_folder}  # Get folder names

    with open(output_path, 'w', encoding='utf-8') as output_file:
        for folder in folders:
            source_text = source_from_zip(zip_file, folder)
            # output_file.write(f"{folder}:source text: {source_text}\n")
            if source_text:
                content_before_abstract = extract_before_abstract(source_text)
                if content_before_abstract:
                    output_file.write(f"Content before abstract in {folder}:\n{content_before_abstract}\n\n")
                else:
                    output_file.write(f"No abstract found in {folder}, or no content before abstract.\n\n")
            else:
                output_file.write(f"No source found in {folder}.\n\n")

In [5]:
# Read input from a text file and filter out files without valid content before the abstract
input_file_path = 'output.txt'  # Replace with your actual file path
output_file_path = 'filtered_files_with_content_macro.txt'

# Variables to keep track of statistics
total_files_count_author = 0
valid_files_count = 0
valid_files_with_content = []

# Reading and processing the input file
with open(input_file_path, 'r') as infile:
    content_blocks = infile.read().split('Content before abstract in ')
    
    for block in content_blocks:
        if block.strip():  # Ensure we are not processing an empty block
            total_files_count_author += 1
            lines = block.split('\n', 1)  # Split to separate the file name from its content
            if len(lines) > 1:
                file_name = lines[0].strip().replace(':', '')
                content = lines[1].strip()
                
                # Check if the content does not indicate "No abstract found"
                if 'No abstract found' not in content and 'no content before abstract' not in content.lower():
                    valid_files_count += 1
                    valid_files_with_content.append((file_name, content))

# Writing the filtered results to an output file
with open(output_file_path, 'w') as outfile:
    for file_name, content in valid_files_with_content:
        outfile.write(f"Content before abstract in {file_name}:\n{content}\n\n")

# Print or save the statistics summary
print(f"Total number of files processed: {total_files_count_author}")
print(f"Total number of files with valid content before abstract: {valid_files_count}")
print(f"Filtered output saved in: {output_file_path}")


Total number of files processed: 83
Total number of files with valid content before abstract: 80
Filtered output saved in: filtered_files_with_content_macro.txt


In [6]:
import re
import os
import json  # For structured storage

# Define file paths
input_file_path = 'filtered_files_with_content_macro.txt'
output_file_path = './tagged_outputs/all_extracted_tags_cleaned.txt'  # Single output file

# Ensure output directory exists
os.makedirs(os.path.dirname(output_file_path), exist_ok=True)

# Tags to search for (without leading backslashes for dictionary keys)
tags_to_search = [
    'institution', 'affiliations', 'affiliation', 'icmlaffiliation',
    'institute', 'affil', 'aff', 'AFF', 'university', 'address'
]

# Dictionary to store extracted data
extracted_data = {}

# Function to extract content inside the first level of braces `{}` (handling optional `[]`)
def extract_tag_content(tag, content):
    pattern = rf"(\\{tag})(\[[^\]]*\])?\{{"
    results = []
    start = 0

    while (match := re.search(pattern, content[start:])) is not None:
        start_idx = start + match.end()  # Start after the macro (past `{`)
        brace_level = 1
        end_idx = start_idx

        # Find the matching closing brace
        while brace_level > 0 and end_idx < len(content):
            if content[end_idx] == '{':
                brace_level += 1
            elif content[end_idx] == '}':
                brace_level -= 1
            end_idx += 1

        # Extract only the content inside the first `{}` (excluding the macro name)
        extracted_content = content[start_idx:end_idx - 1].strip()  # Remove trailing `}`
        results.append(extracted_content)
        start = end_idx  # Move to next occurrence

    return results

# Function to process and store extracted content
def extract_and_store_tags(file_name, content, tags):
    extracted_data[file_name] = {}  # Initialize storage for this file
    found_any_tag = False  # Flag to check if we found any tag

    for tag in tags:
        matches = extract_tag_content(tag, content)

        if matches:
            extracted_data[file_name][tag] = matches  # Store under cleaned key
            found_any_tag = True  # At least one macro was found

    # If no macros were found, store the whole pre-abstract content
    if not found_any_tag:
        extracted_data[file_name]["full_content"] = content

# Process input file
total_files_count = 0

with open(input_file_path, 'r') as infile:
    content_blocks = infile.read().split('Content before abstract in ')

    for block in content_blocks:
        if block.strip():
            total_files_count += 1
            lines = block.split('\n', 1)

            if len(lines) > 1:
                file_name = lines[0].strip().replace(':', '')
                content = lines[1].strip()
                extract_and_store_tags(file_name, content, tags_to_search)

# Save extracted data to a single output file (JSON format for easy reuse)
with open(output_file_path, 'w', encoding='utf-8') as outfile:
    json.dump(extracted_data, outfile, indent=4, ensure_ascii=False)

# Print summary
print(f"Total number of files processed: {total_files_count}")
print(f"Extracted tag data saved in: {output_file_path}")


Total number of files processed: 80
Extracted tag data saved in: ./tagged_outputs/all_extracted_tags_cleaned.txt


In [7]:
import requests
import json
import logging
import os
from tenacity import retry, stop_after_attempt, wait_fixed

# Configure logging
logging.basicConfig(level=logging.INFO)

# OpenAlex API Information
OPENALEX_SEARCH_URL = 'https://api.openalex.org/institutions'

# Initialize cache
openalex_cache = {}

# Load persistent cache
cache_file = 'openalex_cache.json'
if os.path.exists(cache_file):
    with open(cache_file, 'r', encoding='utf-8') as f:
        openalex_cache = json.load(f)
    logging.info("Loaded persistent OpenAlex cache.")
else:
    logging.info("No persistent OpenAlex cache found. Using empty cache.")

@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
def query_openalex(institution_name):
    """
    Query OpenAlex API for institution details.
    """
    if institution_name in openalex_cache:
        return openalex_cache[institution_name]
    
    params = {'search': institution_name}

    try:
        response = requests.get(OPENALEX_SEARCH_URL, params=params, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if data.get('results'):
                first_result = data['results'][0]  # Get the first search result
                
                openalex_cache[institution_name] = {
                    'OpenAlex_ID': first_result.get('id', 'N/A'),
                    'Name': first_result.get('display_name', 'N/A'),
                    'Hint': first_result.get('hint', 'N/A'),
                    'Cited_by_Count': first_result.get('cited_by_count', 'N/A'),
                    'Works_Count': first_result.get('works_count', 'N/A'),
                    'External_ID': first_result.get('external_id', 'N/A')  # This may include a ROR link
                }
                logging.info(f"First OpenAlex result found for '{institution_name}': {first_result.get('id', 'N/A')}")
                return openalex_cache[institution_name]

            else:
                logging.warning(f"No OpenAlex information found for '{institution_name}'.")
                openalex_cache[institution_name] = None
                return None
        else:
            logging.error(f"OpenAlex API query failed with status code: {response.status_code} for institution: {institution_name}")
            openalex_cache[institution_name] = None
            return None
    except Exception as e:
        logging.error(f"OpenAlex API query exception for '{institution_name}', Error: {e}")
        openalex_cache[institution_name] = None
        return None

# Save cache to a file after all queries
def save_cache():
    with open(cache_file, 'w', encoding='utf-8') as f:
        json.dump(openalex_cache, f)
    logging.info("Saved OpenAlex cache to file.")




INFO:root:No persistent OpenAlex cache found. Using empty cache.


In [8]:
import json
import os
from extractors.trie_extractor import TrieExtractor
from utils.file_reader import read_file

# Define file paths
input_file_path = 'tagged_outputs/all_extracted_tags_cleaned.txt'
output_file_path = 'institution_output_with_ror.json'

# Load JSON data
with open(input_file_path, 'r', encoding='utf-8') as infile:
    extracted_data = json.load(infile)

# Load TrieExtractor for full content/address processing
extractor = TrieExtractor(
    data_path="data/1.34_extracted_ror_data.csv",
    common_words_path="data/common_english_words.txt"
)

# Dictionary to store results
results = {}


In [9]:
# Process each paper
for paper, macros in extracted_data.items():
    unique_openalex_ids = {}  # Dictionary to store unique OpenAlex results

    if set(macros.keys()) in [{"full_content"}, {"address"}]:
        # Extract from full content or address
        text = "\n".join(macros.get("full_content", [])) + "\n" + "\n".join(macros.get("address", []))
        extracted_openalex_ids = extractor.extract_affiliations(text)

        if extracted_openalex_ids:
            unique_openalex_ids.update({openalex_id: {"OpenAlex_ID": openalex_id} for openalex_id in extracted_openalex_ids})

    else:
        # Process institution names in macros (excluding full_content & address)
        for content_list in macros.values():
            for institution_name in content_list:
                openalex_info = query_openalex(institution_name)
                if openalex_info:
                    unique_openalex_ids[openalex_info["OpenAlex_ID"]] = openalex_info  # Store unique OpenAlex results

    # Save results if there are OpenAlex matches
    if unique_openalex_ids:
        results[paper] = list(unique_openalex_ids.values())  # Convert to list for JSON storage

# Save results to JSON file
with open(output_file_path, 'w', encoding='utf-8') as outfile:
    json.dump(results, outfile, indent=4, ensure_ascii=False)

print(f"✅ JSON export completed. Results saved to {output_file_path}")

ERROR:root:OpenAlex API query failed with status code: 500 for institution: Institut f\"{u}r Theoretische Physik, Universit\"{a}t Heidelberg, Philosophenweg 16, 69120, Heidelberg, Germany
ERROR:root:OpenAlex API query failed with status code: 500 for institution: Universit\"{a}t Heidelberg, Kirchhoff-Institut f\"{u}r Physik, Im Neuenheimer Feld 227, 69120 Heidelberg, Germany
ERROR:root:OpenAlex API query failed with status code: 500 for institution: Theoretical Astrophysics Department, Eberhard-Karls University of T\"{u}bingen, T\"{u}bingen 72076, Germany
ERROR:root:OpenAlex API query failed with status code: 500 for institution: Institut f\"ur Kernphysik, Johannes Gutenberg-Universit\"{a}t,\\ J.J. Becher-Weg 45, 55128 Mainz, Germany


✅ JSON export completed. Results saved to institution_output_with_ror.json


In [10]:
import pandas as pd

# Step 1: Read the CSV file
df = pd.read_csv('data/2201.00_scopus_931_with_ror.csv')

# Function to preprocess Extracted ROR ID
def preprocess_extracted_ror_id(extracted_ror_id):
    if pd.isna(extracted_ror_id):  # Skip NaN values
        return set()
    # Remove curly braces and split by comma
    extracted_ror_id = extracted_ror_id.strip("{}").split(",")
    # Remove any leading/trailing whitespace and filter out empty strings
    return set(item.strip().strip("''") for item in extracted_ror_id if item.strip())

# Step 2: Group by ArXiv ID and merge ROR ID and Extracted ROR ID into sets
grouped = df.groupby('ArXiv Id').agg({
    'ROR ID': lambda x: set(x.dropna()),  # Create a set of ROR IDs
    'Extracted ROR ID': lambda x: set().union(*x.apply(preprocess_extracted_ror_id))  # Preprocess and union Extracted ROR IDs
}).reset_index()

# Step 3: Calculate accuracy and wrong extraction rate
correct_extractions = 0
wrong_extractions = 0
total_ror_ids = 0

for index, row in grouped.iterrows():
    ror_ids = row['ROR ID']
    extracted_ror_ids = row['Extracted ROR ID']
    if ror_ids:  # Only consider rows with non-empty ROR ID
        total_ror_ids += len(ror_ids)
        correct_extractions += len(ror_ids.intersection(extracted_ror_ids))
        wrong_extractions += len(extracted_ror_ids - ror_ids)  # IDs in extraction but not in ground truth

# Calculate accuracy
accuracy = correct_extractions / total_ror_ids if total_ror_ids > 0 else 0

# Calculate wrong extraction rate
wrong_extraction_rate = wrong_extractions / total_ror_ids if total_ror_ids > 0 else 0

print(f"Accuracy: {accuracy:.2f}")
print(f"Wrong Extraction Rate: {wrong_extraction_rate:.2f}")


KeyError: "Column(s) ['Extracted ROR ID'] do not exist"

In [None]:
import pandas as pd

# Step 1: Read the CSV file
df = pd.read_csv('data/2201.00_scopus_931_with_ror.csv')

# Function to preprocess Extracted ROR ID
def preprocess_extracted_ror_id(extracted_ror_id):
    if pd.isna(extracted_ror_id):  # Skip NaN values
        return set()
    # Remove curly braces and split by comma
    extracted_ror_id = extracted_ror_id.strip("{}").split(",")
    # Remove any leading/trailing whitespace and filter out empty strings
    return set(item.strip().strip("''") for item in extracted_ror_id if item.strip())

# Step 2: Group by ArXiv ID and merge ROR ID and Extracted ROR ID into sets
grouped = df.groupby('ArXiv Id').agg({
    'ROR ID': lambda x: set(x.dropna()),  # Create a set of ROR IDs
    'Extracted ROR ID': lambda x: set().union(*x.apply(preprocess_extracted_ror_id))  # Preprocess and union Extracted ROR IDs
}).reset_index()

# Step 3: Collect cases where extracted ROR IDs do not perfectly match ground truth ROR IDs
mismatched_cases = []

for index, row in grouped.iterrows():
    ror_ids = row['ROR ID']
    extracted_ror_ids = row['Extracted ROR ID']
    if ror_ids != extracted_ror_ids:  # Check for imperfect match
        mismatched_cases.append({
            'ArXiv Id': row['ArXiv Id'],
            'Ground Truth ROR IDs': ror_ids,
            'Extracted ROR IDs': extracted_ror_ids,
            'Missing ROR IDs': ror_ids - extracted_ror_ids,  # IDs in ground truth but not in extraction
            'Extra ROR IDs': extracted_ror_ids - ror_ids  # IDs in extraction but not in ground truth
        })

# Convert the list of mismatched cases to a DataFrame
mismatched_df = pd.DataFrame(mismatched_cases)

# Save the mismatched cases to a CSV file
mismatched_df.to_csv('mismatched_cases.csv', index=False)

print(f"Total mismatched cases: {len(mismatched_df)}")
print("Mismatched cases saved to 'mismatched_cases.csv'")

In [11]:
import requests
import json
import logging
import os
from tenacity import retry, stop_after_attempt, wait_fixed

# Configure logging
logging.basicConfig(level=logging.INFO)

# Configure ROR API information
ROR_SEARCH_URL = 'https://api.ror.org/organizations'

# Initialize ROR cache
ror_cache = {}

# Load persistent cache
cache_file = 'ror_cache.json'
if os.path.exists(cache_file):
    with open(cache_file, 'r', encoding='utf-8') as f:
        ror_cache = json.load(f)
    logging.info("Loaded persistent ROR cache.")
else:
    logging.info("No persistent ROR cache found. Using empty cache.")

@retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
def query_ror(institution_name):
    """
    Query ROR information, returning details from the first result in the response.
    """
    if institution_name in ror_cache:
        return ror_cache[institution_name]
    
    params = {'query': institution_name}

    try:
        response = requests.get(ROR_SEARCH_URL, params=params, timeout=10)
        if response.status_code == 200:
            data = response.json()
            if data.get('items'):
                first_result = data['items'][0]  # Get the first search result
                
                ror_cache[institution_name] = {
                    'ROR_ID': first_result.get('id', 'N/A'),
                    'Name': first_result.get('name', 'N/A'),
                    'Country': first_result.get('country', {}).get('country_name', 'N/A'),
                    'Type': ', '.join(first_result.get('types', []))
                }
                logging.info(f"First ROR result found for '{institution_name}': {first_result.get('id', 'N/A')}")
                return ror_cache[institution_name]

            else:
                logging.warning(f"No ROR information found for '{institution_name}'.")
                ror_cache[institution_name] = None
                return None
        else:
            logging.error(f"ROR API query failed with status code: {response.status_code} for institution: {institution_name}")
            ror_cache[institution_name] = None
            return None
    except Exception as e:
        logging.error(f"ROR API query exception for '{institution_name}', Error: {e}")
        ror_cache[institution_name] = None
        return None


# Save cache to a file after all queries
def save_cache():
    with open(cache_file, 'w', encoding='utf-8') as f:
        json.dump(ror_cache, f)
    logging.info("Saved ROR cache to file.")

INFO:root:Loaded persistent ROR cache.


In [34]:
institution_name = '\footnote(Current address) \orgdivDepartment of Mathematics, \orgnameHampton University, \\ \orgaddress\street609 Norma B Harvey Road, \cityHampton, \postcode23669, \stateVirginia, \countryU.S.'
result1 = query_ror(institution_name)
result2 = query_openalex(institution_name)
print('From ROR:', json.dumps(result1, indent=4, ensure_ascii=False))
print('From OpenAlex:', json.dumps(result2, indent=4, ensure_ascii=False))

INFO:root:First ROR result found for 'ootnote(Current address) \orgdivDepartment of Mathematics, \orgnameHampton University, \ \orgaddress\street609 Norma B Harvey Road, \cityHampton, \postcode23669, \stateVirginia, \countryU.S.': https://ror.org/025t37b39


From ROR: {
    "ROR_ID": "https://ror.org/025t37b39",
    "Name": "B & B",
    "Country": "Slovenia",
    "Type": "Education"
}
From OpenAlex: null


In [36]:
import spacy

# Load English NLP model
nlp = spacy.load("en_core_web_sm")


In [38]:
import re
from geopy.geocoders import Nominatim

# Load a list of countries and cities
geolocator = Nominatim(user_agent="geo_cleaner")

# Example known city, country patterns (you can expand this)
COUNTRY_CITY_PATTERNS = [
    r",?\s*(USA|United States|UK|Germany|France|Italy|Sweden|Canada|China|India|Japan)$",
    r",?\s*[A-Za-z\s]+,\s*(USA|UK|Germany|France|Italy|Sweden|Canada|China|India|Japan)$",
    r",?\s*\d{5,},?\s*[A-Za-z\s]+$",  # Matches postal codes and city names
    r",?\s*(Department of|Faculty of|School of|Institute for|University of)$",  # Matches trailing dept names
]

def clean_institution_name(name):
    """Removes address-related parts from an institution name"""
    cleaned_name = name

    # Try to match and remove known patterns
    for pattern in COUNTRY_CITY_PATTERNS:
        cleaned_name = re.sub(pattern, "", cleaned_name, flags=re.IGNORECASE).strip()

    return cleaned_name

# Example test cases
institution_names = [
    "University of Minnesota, Minneapolis, MN 55455, USA",
    "Stockholm University, Stockholm 106 91, Sweden",
    "Technical University of Munich, Germany; Institute for Advanced Study, D-85748 Garching, Germany",
    "Institut für Theoretische Physik, Universität Heidelberg, Philosophenweg 16, 69120, Heidelberg, Germany"
]

for name in institution_names:
    print(f"Original: {name}")
    print(f"Cleaned: {clean_institution_name(name)}\n")


Original: University of Minnesota, Minneapolis, MN 55455, USA
Cleaned: University of Minnesota, Minneapolis, MN 55455

Original: Stockholm University, Stockholm 106 91, Sweden
Cleaned: Stockholm University, Stockholm 106 91

Original: Technical University of Munich, Germany; Institute for Advanced Study, D-85748 Garching, Germany
Cleaned: Technical University of Munich, Germany; Institute for Advanced Study, D-

Original: Institut für Theoretische Physik, Universität Heidelberg, Philosophenweg 16, 69120, Heidelberg, Germany
Cleaned: Institut für Theoretische Physik, Universität Heidelberg, Philosophenweg 16



In [None]:

import spacy

# Load the spaCy model for Named Entity Recognition (NER)
nlp = spacy.load("en_core_web_sm")

def remove_address_spacy(text):
    """Uses spaCy NER to remove location entities from an institution name"""
    doc = nlp(text)
    cleaned_parts = []
    
    for ent in doc.ents:
        if ent.label_ in {"GPE", "LOC"}:  # GPE = Geo-Political Entity (City, Country, etc.)
            continue  # Skip locations
        cleaned_parts.append(ent.text)

    return " ".join(cleaned_parts)

# Example usage
test_institutions = [
    "University of Minnesota, Minneapolis, MN 55455, USA",
    "Stockholm University, Stockholm 106 91, Sweden",
    "Technical University of Munich, Germany; Institute for Advanced Study, D-85748 Garching, Germany",
    "Institut für Theoretische Physik, Universität Heidelberg, Philosophenweg 16, 69120, Heidelberg, Germany"
]

for name in test_institutions:
    print(f"Original: {name}")
    print(f"Cleaned: {remove_address_spacy(name)}\n")

Original: University of Minnesota, Minneapolis, MN 55455, USA
Cleaned: University of Minnesota 55455

Original: Stockholm University, Stockholm 106 91, Sweden
Cleaned: Stockholm University 106 91

Original: Technical University of Munich, Germany; Institute for Advanced Study, D-85748 Garching, Germany
Cleaned: Technical University of Munich Institute for Advanced Study D-85748 Garching

Original: Institut für Theoretische Physik, Universität Heidelberg, Philosophenweg 16, 69120, Heidelberg, Germany
Cleaned: Physik Universität Heidelberg Philosophenweg 16, 69120



In [43]:
from pylatexenc.latex2text import LatexNodes2Text

converter = LatexNodes2Text()

def latex_to_unicode(text):
    """Converts LaTeX special characters to Unicode."""
    return converter.latex_to_text(text)

# Example conversion
test_strings = [
    r"Institut f\"{u}r Theoretische Physik, Universit\"{a}t Heidelberg",
    r"Dipartimento di Fisica ``E.R. Caianiello'', Università degli Studi di Salerno",
    r"Technische Universit\"{a}t M\"{u}nchen",
]

for s in test_strings:
    print(f"Original: {s}")
    print(f"Converted: {latex_to_unicode(s)}\n")


Original: Institut f\"{u}r Theoretische Physik, Universit\"{a}t Heidelberg
Converted: Institut für Theoretische Physik, Universität Heidelberg

Original: Dipartimento di Fisica ``E.R. Caianiello'', Università degli Studi di Salerno
Converted: Dipartimento di Fisica “E.R. Caianiello”, Università degli Studi di Salerno

Original: Technische Universit\"{a}t M\"{u}nchen
Converted: Technische Universität München

