## Database Debug

Extract OAI metadata

In [1]:
import xml.etree.ElementTree as ET

def extract_oad_metadata(full_path):
    namespaces = {
        'oai': 'http://www.openarchives.org/OAI/2.0/',
        'arxiv': 'http://arxiv.org/OAI/arXiv/'
    }
                
    oai_metadata = []
    # read xml file by line
    with open(full_path, 'r', encoding='utf-8') as file:
        for line in file:
            xml_info = ET.fromstring(line)
        
            # target on record element
            if xml_info.tag == '{http://www.openarchives.org/OAI/2.0/}record':
                # get header info
                header = xml_info.find('oai:header', namespaces)
                identifier = header.find('oai:identifier', namespaces).text
                datestamp = header.find('oai:datestamp', namespaces).text
                setSpec = header.find('oai:setSpec', namespaces).text

                # get metadata
                metadata = xml_info.find('oai:metadata', namespaces)
                arxiv = metadata.find('arxiv:arXiv', namespaces)
                
                # get arXiv info
                arxiv_id = arxiv.find('arxiv:id', namespaces).text
                created = arxiv.find('arxiv:created', namespaces).text
                updated = arxiv.find('arxiv:updated', namespaces).text if arxiv.find('arxiv:updated', namespaces) is not None else None
                
                # get authors info
                authors = []
                for author in arxiv.findall('arxiv:authors/arxiv:author', namespaces):
                    keyname = author.find('arxiv:keyname', namespaces).text
                    forenames = author.find('arxiv:forenames', namespaces).text if author.find('arxiv:forenames', namespaces) is not None else ''
                    suffix = author.find('arxiv:suffix', namespaces)
                    suffix_text = suffix.text if suffix is not None else ''
                    authors.append(f"{forenames} {keyname} {suffix_text}".strip())
                
                # get title, abstract, etc
                title = arxiv.find('arxiv:title', namespaces).text
                categories = arxiv.find('arxiv:categories', namespaces).text.split(' ')
                comments = arxiv.find('arxiv:comments', namespaces).text if arxiv.find('arxiv:comments', namespaces) is not None else None
                journal_ref = arxiv.find('arxiv:journal-ref', namespaces)
                journal_ref_text = journal_ref.text if journal_ref is not None else None
                doi = arxiv.find('arxiv:doi', namespaces)
                doi_text = doi.text if doi is not None else None
                license = arxiv.find('arxiv:license', namespaces).text
                abstract = arxiv.find('arxiv:abstract', namespaces).text

                # construct dict
                record_data = {
                    "identifier": identifier,
                    "datestamp": datestamp,
                    "setSpec": setSpec,
                    "arxiv_id": arxiv_id,
                    "created": created,
                    "updated": updated,
                    "authors": authors,
                    "title": title,
                    "categories": categories,
                    "comments": comments,
                    "journal_ref": journal_ref_text,
                    "doi": doi_text,
                    "license": license,
                    "abstract": abstract
                }
                oai_metadata.append(record_data)
    return oai_metadata

config

In [2]:
import os
DB_PATH ='/home/jiezi/Code/Github/TrendingPapers/data/'
DB_NAME = 'trending_papers.db'
OAI_PAPER_TBL_NM = "oai_paper_pool"  # table for preprint paper metadata (batch trhough OAI)
OAI_PAPER_TBL_KEY = 'identifier'   # PK column for OAI_PAPER_TBL_NM

In [3]:
import json
import sqlite3
import pandas as pd
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def sqlite_connect(db_name):
    try:
        conn = sqlite3.connect(db_name)
        return conn
    except sqlite3.Error as e:
        print(f"Error connecting to database: {e}")
        return None

def df_to_sqlite(
        df, 
        table_name, 
        db_name, 
        id_key=None,
        if_exists='append' 
        ):   
    """import pandas DataFrame to SQLite database
    Args:
        :param pd.DataFrame df: DataFrame to import
        :param str table_name: table name to import to
        :param str db_name: database name
        :param str id_key: primary key for the table
        :param str if_exists: 'append' or 'replace'
    Returns:
        :returns: None
    Note:
        - If 'id_key' is provided, the function will check for existing records in the database and only insert new records.
        - If 'if_exists' is set to 'replace', the function will replace the existing table with the new data.
        - If 'if_exists' is set to 'append', the function will append the new data to the existing table.
        - The code would automatically neglect columns that are not in the table.
        - The code would set the value of missing columns to None.
        - Automatically create table if not exist.
    """
    conn = sqlite_connect(db_name)
    if conn:
        df_converted = df.copy()

        try:
            # Check if the table exists
            cursor = conn.cursor()
            cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")
            table_exists = cursor.fetchone() is not None

            # 1. Identify and Convert Dict/List-of-Dict Columns to JSON
            # This block of code must be placed before creating the table
            for col in df_converted.columns:
                if df_converted[col].dtype == 'object':
                    df_converted[col] = df_converted[col].apply(lambda x: json.dumps(x, ensure_ascii=False) if any(isinstance(x, (dict, list)) for x in df_converted[col].dropna()) else str(x))

            # Create table if it doesn't exist
            if not table_exists:
                create_table_from_df(conn, df_converted, table_name, id_key)

            # Get the list of columns in the existing table
            cursor.execute(f"PRAGMA table_info({table_name})")
            table_columns = {row[1] for row in cursor.fetchall()} # Using a set for faster lookup

            if id_key and table_exists:
                # Fetch existing IDs from the database
                cursor.execute(f"SELECT DISTINCT {id_key} FROM {table_name}")
                existing_ids = {row[0] for row in cursor.fetchall()}

                # Filter out rows with IDs that already exist
                df_converted = df_converted[~df_converted[id_key].isin(existing_ids)]

            if df_converted.empty and table_exists:
                print(f"No new records to insert into '{table_name}' (based on '{id_key}').")
                return
            
            # --- Modification: Keep only relevant columns ---
            if table_exists:
                df_converted = df_converted.loc[:, df_converted.columns.isin(table_columns)]

            # Add missing columns to the DataFrame and set values to None
            if table_exists:
                for col in table_columns:
                    if col not in df_converted.columns:
                        df_converted[col] = None

            # Reorder DataFrame columns to match the table's column order
            # Convert table_columns set back to a list for ordering
            if table_exists:
                df_converted = df_converted[list(table_columns)]
            
            # 2. Explicitly define SQLite types if needed
            dtype_mapping = {}
            for col_name, col_type in df_converted.dtypes.items():
                if col_name == id_key:
                    dtype_mapping[col_name] = "TEXT PRIMARY KEY"  # Assuming ID key is text
                elif 'int' in str(col_type):
                    dtype_mapping[col_name]  = "INTEGER"
                elif 'float' in str(col_type):
                    dtype_mapping[col_name]  = "REAL"
                else:
                    dtype_mapping[col_name]  = "TEXT"

            df_converted.to_sql(
                table_name, conn, if_exists=if_exists, index=False,
                dtype=dtype_mapping
                )
            # df_converted.to_sql(table_name, conn, if_exists=if_exists, index=False)
            print(f"Data successfully written to table '{table_name}' in '{db_name}'")
        except Exception as e:
            logger.error(f"Error writing to database: {e}")
            print(f"Error writing to database: {e}")
        finally:
            conn.close()

def create_table_from_df(conn, df, table_name, id_key):
    """Creates a table in the SQLite database based on the DataFrame structure."""
    columns_sql = []
    for col_name, col_type in df.dtypes.items():
        if col_name == id_key:
            sql_type = "TEXT PRIMARY KEY"  # Assuming ID key is text
        elif 'int' in str(col_type):
            sql_type = "INTEGER"
        elif 'float' in str(col_type):
            sql_type = "REAL"
        else:
            sql_type = "TEXT"
        columns_sql.append(f'"{col_name}" {sql_type}') # Wrapping column names in double quotes

    create_table_sql = f"CREATE TABLE {table_name} ({', '.join(columns_sql)})"
    cursor = conn.cursor()
    cursor.execute(create_table_sql)
    conn.commit()
    print(f"Table '{table_name}' created successfully.")

In [4]:
def deduplicate_list_of_dicts(data, key):
    """dedup list of dicts based on given key. Keep only the first item.
    """
    seen = set()  # 使用集合来高效地检查是否已经遇到过某个值
    deduplicated_data = []
    for item in data:
        value = item[key]
        if value not in seen:
            seen.add(value)
            deduplicated_data.append(item)
    return deduplicated_data

In [None]:
import pandas as pd
from datetime import datetime, timedelta

CURRENT_DT = datetime.today().strftime('%Y-%m-%d')

full_path = "../data/stat_2025-02-05_2025-02-06.xml"
oai_metadata = extract_oad_metadata(full_path)
cleaned_oai_metadata = deduplicate_list_of_dicts(oai_metadata, OAI_PAPER_TBL_KEY)
df = pd.DataFrame(cleaned_oai_metadata)
df['insert_dt'] = 'new'
df_to_sqlite(
    df, 
    table_name = OAI_PAPER_TBL_NM, 
    db_name = os.path.join(DB_PATH, DB_NAME),
    if_exists = 'append', 
    id_key = OAI_PAPER_TBL_KEY)

## Huggingface API Issue

In [42]:
import random
import requests
from requests.adapters import Retry, HTTPAdapter
from datetime import datetime
import logging

from json_repair import repair_json  # https://github.com/mangiucugna/json_repair/
from firecrawl import FirecrawlApp  # pip install firecrawl-py https://github.com/mendableai/firecrawl

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

_useragent_list = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.62',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/111.0'
]

class HuggingFaceKit:
    def __init__(self, max_retries_cnt=3, firecrawl_api_key=None):
        self.base_url = "https://huggingface.co/api/daily_papers"
        self.headers = {
            "User-Agent": random.choice(_useragent_list)
        }
        self.session = requests.Session()
        retry_strategy = Retry(
            total=max_retries_cnt,
            status_forcelist=[429, 500, 502, 503, 504],
            backoff_factor=1,
            allowed_methods=["GET"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

        if firecrawl_api_key is not None:
            self.firecrawl = FirecrawlApp(api_key=firecrawl_api_key)


    def fetch_daily_papers(self, date_str=None, max_cnt=None):
        logger.info(f"正在获取 {date_str} 的论文数据")
        
        # 构建API URL
        if date_str is not None and max_cnt is not None:
            url = f"{self.base_url}?date={date_str}&limit={max_cnt}"
        elif date_str is not None:
            url = f"{self.base_url}?date={date_str}"
        elif max_cnt is not None:
            url = f"{self.base_url}?limit={max_cnt}"
        else:
            url = self.base_url 

        try:
            response = self.session.get(url, headers=self.headers, timeout=10) # 添加 timeout
            response.raise_for_status()
            data = response.json()

            if not data:
                print("No data received from API.")

            # Debug: Print keys of the first paper
            if data: 
                hf_paper_dicts = [item.get('paper') for item in data]
                return hf_paper_dicts
            else:
                return []

        except requests.exceptions.RequestException as e: # 捕获更具体的 requests 异常
            print(f"Error fetching papers through API: {e}\nSwitch to FireCrawl:\n")
            try:
                response = self.firecrawl.scrape_url(url=url, params={
                    'formats': [ 'markdown', 'links' ],
                    'excludeTags': [ '.ad', 'script', '#footer' ]
                })
                md = response.get('markdown').replace("\\n", " ").replace("\\", "")
                data = json.loads(repair_json(md))
                if data:
                    hf_paper_dicts = []
                    for item in data:
                        paper_metadata = item.get('paper')
                        rvsd_paper_metadata = {{'\\_id':'_id'}.get(key, key): 
                                               value for key, value in paper_metadata.items()}
                        hf_paper_dicts.append(rvsd_paper_metadata)
                    return hf_paper_dicts
                else:
                    return []
            except Exception as e:
                print(f"Unexpected error: {e}")
                return []
        except Exception as e:
            print(f"Unexpected error: {e}")
            return []

In [None]:
hf_kit = HuggingFaceKit(max_retries_cnt=3, firecrawl_api_key=)
papers = hf_kit.fetch_daily_papers()
if papers:
    print(f"Successfully fetched {len(papers)} papers.")
else:
    print("Failed to fetch papers after multiple retries.")

In [None]:
papers