In [None]:
# create subset of dnb records (10000 entries)

import psycopg2
import json
from dotenv import load_dotenv
import os
from typing import Dict, List, Tuple

# Umgebungsvariablen laden
load_dotenv()


def connect_to_db() -> psycopg2.extensions.connection:
    """Datenbankverbindung herstellen"""
    return psycopg2.connect(
        host=os.getenv("DB_HOST"),
        port=os.getenv("DB_PORT"),
        database=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
    )


def get_base_conditions() -> str:
    """Basisbedingungen für die Datenbankabfrage"""
    return """
        num_pages <= 200 
        AND abstract_num IS NOT NULL 
        AND abstract_num != '0'
    """


def get_category_counts(cur: psycopg2.extensions.cursor) -> Dict[str, int]:
    """Ermittelt die Anzahl der Einträge pro DDC-Kategorie"""
    base_conditions = get_base_conditions()
    category_counts = {}

    for ddc in range(10):
        ddc_str = str(ddc)
        cur.execute(
            f"""
            SELECT COUNT(*)
            FROM dnb_records
            WHERE {base_conditions}
              AND SUBSTRING(ddc FROM 1 FOR 1) = %s
        """,
            (ddc_str,),
        )
        category_counts[ddc_str] = cur.fetchone()[0]

    return category_counts


def create_balanced_subset(category_counts: Dict[str, int], total_needed: int) -> Dict[str, int]:
    """
    Erstellt ein ausgewogenes Subset der Daten durch rotierende Auswahl.
    Verteilt die Einträge gleichmäßig auf alle Kategorien, die noch Einträge haben.
    """
    entries_needed = {ddc: 0 for ddc in category_counts.keys()}
    remaining_entries = {ddc: count for ddc, count in category_counts.items()}
    entries_to_allocate = total_needed

    while entries_to_allocate > 0:
        # Nur Kategorien berücksichtigen, die noch Einträge haben
        available_categories = [ddc for ddc, count in remaining_entries.items() if count > 0]

        if not available_categories:
            break

        # Berechne, wie viele Einträge pro Kategorie in dieser Runde verteilt werden
        entries_per_category = max(1, entries_to_allocate // len(available_categories))

        for ddc in available_categories:
            # Nimm den kleineren Wert: verfügbare Einträge oder zu verteilende Einträge
            entries_to_take = min(remaining_entries[ddc], entries_per_category, entries_to_allocate)

            entries_needed[ddc] += entries_to_take
            remaining_entries[ddc] -= entries_to_take
            entries_to_allocate -= entries_to_take

            if entries_to_allocate <= 0:
                break

    return entries_needed


def create_subset_table(cur: psycopg2.extensions.cursor, entries_needed: Dict[str, int]) -> None:
    """Erstellt eine neue Tabelle für das Subset"""
    # First create an index on the main table if it doesn't exist
    cur.execute("""
        CREATE INDEX IF NOT EXISTS idx_dnb_records_ddc 
        ON dnb_records(ddc, num_pages, abstract_num);
    """)
    
    cur.execute(
        """
        DROP TABLE IF EXISTS dnb_records_subset;
        CREATE TABLE dnb_records_subset AS
        WITH ranked_records AS (
            SELECT *,
                ROW_NUMBER() OVER (
                    PARTITION BY SUBSTRING(ddc FROM 1 FOR 1)
                    ORDER BY RANDOM()
                ) as row_num
            FROM dnb_records
            WHERE num_pages <= 200 
                AND abstract_num IS NOT NULL 
                AND abstract_num != '0'
        )
        SELECT * FROM ranked_records r
        WHERE row_num <= (
            SELECT count::integer
            FROM jsonb_each_text(%s) as t(ddc, count)
            WHERE t.ddc = SUBSTRING(r.ddc FROM 1 FOR 1)
        );
    """,
        (json.dumps(entries_needed),),
    )


def create_subset_indices(cur: psycopg2.extensions.cursor) -> None:
    """Erstellt Indizes für die Subset-Tabelle"""
    cur.execute(
        """
        CREATE INDEX idx_subset_ddc ON dnb_records_subset(ddc);
        CREATE INDEX idx_subset_id ON dnb_records_subset(id);
    """
    )


def verify_subset(cur: psycopg2.extensions.cursor, entries_needed: Dict[str, int]) -> None:
    """Überprüft, ob das Subset korrekt erstellt wurde"""
    cur.execute(
        """
        SELECT SUBSTRING(ddc FROM 1 FOR 1) as ddc_category, COUNT(*) as count
        FROM dnb_records_subset
        GROUP BY ddc_category
        ORDER BY ddc_category;
    """
    )

    results = cur.fetchall()
    print("\nTatsächliche Verteilung im Subset:")
    for ddc_category, count in results:
        expected = entries_needed.get(ddc_category, 0)
        print(f"DDC {ddc_category}: {count} Einträge (Erwartet: {expected})")

def drop_subset_table(conn: psycopg2.extensions.connection) -> None:
    """Drops the subset table efficiently"""
    with conn.cursor() as cursor:
        # Drop the indices first for faster drop
        cursor.execute("""
            DROP INDEX IF EXISTS idx_subset_ddc;
            DROP INDEX IF EXISTS idx_subset_id;
            DROP TABLE IF EXISTS dnb_records_subset;
        """)
    conn.commit()

def force_drop_table(conn: psycopg2.extensions.connection) -> None:
    """Force drops the table by first terminating conflicting connections"""
    with conn.cursor() as cur:
        # Terminate any other connections to the table
        cur.execute("""
            SELECT pg_terminate_backend(pid)
            FROM pg_stat_activity
            WHERE pid != pg_backend_pid()
              AND query LIKE '%dnb_records_subset%';
        """)
        
        # Drop the indices and table
        cur.execute("""
            DROP INDEX IF EXISTS idx_subset_ddc;
            DROP INDEX IF EXISTS idx_subset_id;
            DROP TABLE IF EXISTS dnb_records_subset;
        """)
    conn.commit()

def main():
    # Verbindung zur Datenbank herstellen
    conn = connect_to_db()
    cur = conn.cursor()

    try:
        # Gesamtzahl der gefilterten Einträge ermitteln
        cur.execute(
            f"""
            SELECT COUNT(*) 
            FROM dnb_records 
            WHERE {get_base_conditions()}
        """
        )
        total_filtered_records = cur.fetchone()[0]
        print(f"Gesamtzahl der gefilterten Einträge: {total_filtered_records}")

        # Anzahl der Einträge pro Kategorie ermitteln
        category_counts = get_category_counts(cur)
        print("\nAnzahl der Einträge pro DDC-Kategorie:")
        for ddc, count in category_counts.items():
            print(f"DDC {ddc}: {count} Einträge")

        # Subset erstellen
        total_needed = 10000
        entries_needed = create_balanced_subset(category_counts, total_needed)

        print("\nBerechnete Anzahl der Einträge pro DDC-Kategorie für das Subset:")
        for ddc, count in entries_needed.items():
            print(f"DDC {ddc}: {count} Einträge")


        # Drop the table if it exists
        print("\nDropping subset table if it exists...")
        force_drop_table(conn)

        # Subset in Datenbank speichern
        print("\nErstelle Subset-Tabelle...")
        create_subset_table(cur, entries_needed)

        print("Erstelle Indizes...")
        create_subset_indices(cur)

        print("Überprüfe Subset...")
        verify_subset(cur, entries_needed)

        # Änderungen committen
        conn.commit()

    finally:
        cur.close()
        conn.close()


if __name__ == "__main__":
    main()

In [30]:
# get subset and print schema and example entries
import psycopg2
from dotenv import load_dotenv
import os

# Umgebungsvariablen laden
load_dotenv()

def connect_to_db() -> psycopg2.extensions.connection:
    """Datenbankverbindung herstellen"""
    return psycopg2.connect(
        host=os.getenv("DB_HOST"),
        port=os.getenv("DB_PORT"),
        database=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
    )

# Verbindung zur Subset-Datenbank herstellen
subset_conn = connect_to_db()
subset_cur = subset_conn.cursor()

# Anzahl der Einträge in der Subset-Tabelle zählen
subset_cur.execute("SELECT COUNT(*) FROM dnb_records_subset;")
total_subset_entries = subset_cur.fetchone()[0]
print(f"Gesamtzahl der Einträge in der Subset-Tabelle: {total_subset_entries}")

# Schema der Tabelle anzeigen
subset_cur.execute(
    """
    SELECT column_name, data_type 
    FROM information_schema.columns 
    WHERE table_name = 'dnb_records_subset'
    ORDER BY ordinal_position;
"""
)
print("Schema der Subset-Tabelle:")
for column in subset_cur.fetchall():
    print(f"{column[0]}: {column[1]}")

print("\n5 Beispieleinträge:")
subset_cur.execute(
    """
    SELECT * FROM dnb_records_subset
    ORDER BY RANDOM() 
    LIMIT 5;
"""
)
for row in subset_cur.fetchall():
    print("\n---")
    for i, column in enumerate(subset_cur.description):
        print(f"{column.name}: {row[i]}")

# Ressourcen freigeben
subset_cur.close()
subset_conn.close()

Gesamtzahl der Einträge in der Subset-Tabelle: 15038
Schema der Subset-Tabelle:
id: integer
idn: character varying
title: text
title_additional: text
title_author: text
author_person_id: character varying
author_person_name: text
author_person_role: character varying
author_institution_id: character varying
author_institution_name: text
publication_year: character varying
issn: character varying
keywords: text
country: character varying
language: character varying
ddc: character varying
type_of_material: character varying
university: character varying
year: character varying
urn: character varying
path: character varying
file_size: integer
content_type: character varying
file_extension: character varying
num_pages: integer
url_dnb_archive: character varying
url_resolving_system: character varying
url_publisher: character varying
selection: integer
converted_file: character varying
drive_file_id: character varying
conversion_lock: date
abstract_num: integer
summary_num: integer
abstract

In [None]:
# 🚨 Delete subset table

def connect_to_db() -> psycopg2.extensions.connection:
    """Datenbankverbindung herstellen"""
    return psycopg2.connect(
        host=os.getenv("DB_HOST"),
        port=os.getenv("DB_PORT"),
        database=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
    )

conn = connect_to_db()

# Drop the table if it exists
drop_table_query = "DROP TABLE IF EXISTS dnb_records_subset"

# Execute the query
with conn.cursor() as cursor:
    cursor.execute(drop_table_query)
    conn.commit()
