In [None]:
from io import StringIO
import json
import logging
from time import sleep

from bs4 import BeautifulSoup
import pandas as pd
import requests
from tqdm.notebook import tqdm
from urllib.parse import urljoin, unquote

from constants import ALL_COLUMNS, IRRELEVANT_COLUMNS, RENAME_COLUMNS, SHOULD_BE_NOTEMPTY_COLS

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler("leopoldina_scraper.log", encoding='utf-8')]
)

In [None]:
BASE_URL = "https://de.wikipedia.org"
BASE_TITLE = "Liste_der_Mitglieder_der_Deutschen_Akademie_der_Naturforscher_Leopoldina"
API_URL = "https://de.wikipedia.org/w/api.php"
YEARS = range(1885, 1951)

all_tables = {}

In [None]:
logging.info("Starting to scrape Leopoldina members data")
for year in tqdm(YEARS):
    title = f'{BASE_TITLE}/{year}'
    logging.info(f"{year}: Fetching page via Wikipedia API...")
    
    params = {
        "action": "parse",
        "page": title,
        "format": "json",
        "prop": "text"
    }

    try:
        response = requests.get(API_URL, params=params)
        response.raise_for_status()
        data = response.json()
        
        if "error" in data:
            logging.warning(f"\t{year}: Page not found (API returned error).")
            continue
        
        html_content = data["parse"]["text"]["*"]
        soup = BeautifulSoup(html_content, "html.parser")

        tables = soup.select("div.mw-parser-output > table")

        if tables:
            if len(tables) > 1:
                logging.warning(f"\t{year}: More than one table found, using the first one.")
            
            # HTML table to DataFrame
            table = tables[0]
            table_html = str(table)
            df = pd.read_html(StringIO(table_html))[0]

            # Extract links for the "Name" column (second column)
            name_links = []
            for row in table.select("tr")[1:]: # Skip header row
                cells = row.find_all("td")
                if len(cells) > 1:  # Ensure the second column exists
                    name_cell = cells[1]  # Second column
                    if name_cell.find("a"):
                        link = name_cell.find("a").get("href")
                        if link and "redlink=1" not in link: # Ignore redlinks (Missing pages)
                            full_link = urljoin(BASE_URL, link)
                            decoded_link = unquote(full_link) # Decode URL
                            name_links.append(full_link)
                        else:
                            name_links.append(None)
                    else:
                        name_links.append(None)
                else:
                    name_links.append(None)

            # Add the links as a new column in the DataFrame
            df["Link"] = name_links

            all_tables[year] = df
            logging.info(f"\t{year}: Table found with {len(df)} rows.")
        else:
            logging.warning(f"\t{year}: No table found on the page.")

    except Exception as e:
        logging.error(f"{year}: Failed due to {e}")

    sleep(0.5) # Politeness to the server


In [None]:
logging.info("Verifying all years...")
# Check if all tables are present
if len(all_tables) != len(YEARS):
    missing_years = set(YEARS) - set(all_tables.keys())
    logging.warning(f"Missing years: {missing_years}")

logging.info("Comparing columns of all tables...")
first_table_columns = list(next(iter(all_tables.values())).columns)

# Check for column differences
for year, table in all_tables.items():
    current_columns = list(table.columns)
    if current_columns != first_table_columns:
        missing_cols = set(first_table_columns) - set(current_columns)
        exceeding_cols = set(current_columns) - set(first_table_columns)
        if missing_cols:
            logging.warning(f"\t{year}: Missing columns {missing_cols}")
        if exceeding_cols:
            logging.warning(f"\t{year}: Exceeding columns {exceeding_cols}")

logging.info("Comparing columns of all renamed tables...")
# Rename and verify columns
all_tables_renamed = {}
for year, table in all_tables.items():
    # Rename columns
    all_tables_renamed[year] = table.rename(columns=RENAME_COLUMNS)
    # Verify renaming
    current_columns = list(all_tables_renamed[year].columns)

    missing_cols =  set(ALL_COLUMNS) - set(current_columns)
    exceeding_cols = set(current_columns) - set(ALL_COLUMNS)

    if missing_cols:
        logging.warning(f"\t{year}: Missing columns: {missing_cols}")
    if exceeding_cols:
        logging.warning(f"\t{year}: Exceeding columns: {exceeding_cols}")

In [None]:
# Concatenate all tables into a single DataFrame
combined_df = pd.concat(all_tables_renamed.values(), keys=all_tables_renamed.keys(), names=["Year", "Row"])

# Drop irrelevant columns
logging.info(f"Dropping irrelevant columns: {IRRELEVANT_COLUMNS}")
combined_df.drop(columns=IRRELEVANT_COLUMNS, inplace=True)

# Log the length of the DataFrame
logging.info(f"Combined dataset length: {len(combined_df)} rows")

# Log the number of empty values per column
logging.info("Logging the number of empty values per column:")
for column in combined_df.columns:
    empty_count = combined_df[column].isna().sum()
    logging.info(f"\tColumn '{column}': {empty_count} empty values")

# Save the combined dataset to a CSV file
logging.info("Saving combined dataset to CSV...")
combined_df.to_csv("../data/leopoldina_dataset_1885_1950.csv", index=True, encoding="utf-8")

# Missing Data

In [None]:
# Inpute missing values
df = pd.read_csv("../data/leopoldina_dataset_1885_1950.csv", encoding="utf-8")

In [None]:
# Create json for missing data
missing_data = {}
for column in SHOULD_BE_NOTEMPTY_COLS:
    for name in list(df[df[column].isna()]["Name"]):
        if name not in missing_data:
            missing_data[name] = {}
        missing_data[name].update({column: ""})

In [None]:
# Save the missing_data dictionary to a JSON file
with open("../data/missing_data.json", "w", encoding="utf-8") as json_file:
    json.dump(missing_data, json_file, ensure_ascii=False, indent=4)