# Viewers

In [9]:
from pathlib import Path
import duckdb
import pandas as pd
from project_config import DUCKDB_FILE


def describe_duckdb_table(db_path: str | Path, table_name: str) -> pd.DataFrame:
    con = duckdb.connect(database=db_path, read_only=True)

    columns_df = con.execute(f"PRAGMA table_info('{table_name}')").fetchdf()
    columns = columns_df["name"].tolist()

    summary_rows = []

    for col in columns:
        row = con.execute(
            f"""
            SELECT
                '{col}' AS column,
                COUNT("{col}") AS non_null,
                COUNT(*) - COUNT("{col}") AS nulls,
                MIN(LENGTH(CAST("{col}" AS VARCHAR))) AS min_len,
                MAX(LENGTH(CAST("{col}" AS VARCHAR))) AS max_len,
                AVG(LENGTH(CAST("{col}" AS VARCHAR))) AS avg_len
            FROM {table_name}
        """
        ).fetchone()
        summary_rows.append(row)

    summary_df = pd.DataFrame(
        summary_rows,
        columns=["column", "non_null", "nulls", "min_len", "max_len", "avg_len"],
    )
    con.close()
    return summary_df


# Example usage:
summary = describe_duckdb_table(DUCKDB_FILE, "job_urls")
print(summary)

        column  non_null  nulls  min_len  max_len     avg_len
0          url        19      0       47      241  122.684211
1      company        19      0        3       18    9.578947
2    job_title        19      0       19       71   40.684211
3  source_file        19      0       67       67   67.000000
4        stage        19      0       13       13   13.000000
5    timestamp        19      0       26       26   26.000000


In [22]:
from pathlib import Path
import duckdb
from project_config import DUCKDB_FILE as db_path


def describe_duckdb_table(db_path: str | Path, table_name: str) -> pd.DataFrame:
    con = duckdb.connect(database=db_path, read_only=True)

    columns_df = con.execute(f"PRAGMA table_info('{table_name}')").fetchdf()
    columns = columns_df["name"].tolist()

    summary_rows = []

    for col in columns:
        row = con.execute(
            f"""
            SELECT
                '{col}' AS column,
                COUNT("{col}") AS non_null,
                COUNT(*) - COUNT("{col}") AS nulls,
                MIN(LENGTH(CAST("{col}" AS VARCHAR))) AS min_len,
                MAX(LENGTH(CAST("{col}" AS VARCHAR))) AS max_len,
                AVG(LENGTH(CAST("{col}" AS VARCHAR))) AS avg_len
            FROM {table_name}
        """
        ).fetchone()
        summary_rows.append(row)

    summary_df = pd.DataFrame(
        summary_rows,
        columns=["column", "non_null", "nulls", "min_len", "max_len", "avg_len"],
    )
    con.close()
    return summary_df


def print_columns(
    db_path: Path | str, table_name: str, columns: list[str], limit: int = 10
):
    con = duckdb.connect(str(db_path))

    col_str = ", ".join(columns)
    query = f"SELECT {col_str} FROM {table_name} LIMIT {limit}"

    df = con.execute(query).fetchdf()
    print(df)

    con.close()


summary = describe_duckdb_table(db_path, "job_postings")
print(summary)

print_columns(db_path, "job_postings", ["url", "content"])

         column  non_null  nulls  min_len  max_len      avg_len
0           url        38      0       44      377   145.631579
1        status        38      0        7        7     7.000000
2       message        30      8       37       37    37.000000
3     job_title        38      0       18       71    39.973684
4       company        38      0        3       25     9.921053
5      location        27     11       10      693    74.592593
6   salary_info        25     13       20      323    81.840000
7   posted_date         9     29        0       19     8.777778
8       content        38      0      229     9045  3754.789474
9   source_file        38      0       61       61    61.000000
10        stage        38      0       13       13    13.000000
11    timestamp        38      0       26       26    26.000000
                                                 url  \
0  https://www.google.com/about/careers/applicati...   
1  https://www.capitalonecareers.com/job/-/-/234/...   


# Update Tables

In [11]:
import pandas as pd
import duckdb
from pathlib import Path
from db_io.db_insert import align_df_with_schema
from db_io.db_schema_registry import DUCKDB_SCHEMA_REGISTRY
from db_io.pipeline_enums import TableName
from project_config import DB_DIR, DUCKDB_FILE


def update_table_from_excel(
    excel_path: Path,
    table_name: TableName,
    update_columns: list[str],
    primary_keys: list[str] | None = None,
):
    """
    Update specified columns in a DuckDB table using an Excel file.

    Args:
        excel_path (Path): Path to the Excel file.
        table_name (TableName): Enum of the DuckDB table to update.
        update_columns (list[str]): List of columns to update.
        primary_keys (list[str] | None): List of primary key columns for identifying rows.
                                         If None, will be fetched from the schema registry.

    Raises:
        ValueError: If required columns are missing or data types mismatch.
    """
    # Path to the DuckDB database
    db_path = DUCKDB_FILE

    # Fetch primary keys from the schema registry if not provided
    if primary_keys is None:
        schema = DUCKDB_SCHEMA_REGISTRY.get(table_name)
        if not schema:
            raise ValueError(f"Table '{table_name}' not found in schema registry.")
        primary_keys = schema.primary_keys

    # Load the Excel file
    df = pd.read_excel(excel_path)

    # Validate that primary keys and update columns exist in the DataFrame
    missing_columns = [
        col for col in primary_keys + update_columns if col not in df.columns
    ]
    if missing_columns:
        raise ValueError(f"Missing columns in the Excel file: {missing_columns}")

    # Ensure column alignment
    schema = DUCKDB_SCHEMA_REGISTRY[table_name]
    df = align_df_with_schema(df, schema.column_order)

    # Connect to DuckDB
    con = duckdb.connect(database=str(db_path))

    # Iterate over each row and perform the update
    for _, row in df.iterrows():
        # Construct WHERE clause using all primary keys with proper data type handling
        where_clause = " AND ".join(
            [
                f"TRIM(LOWER(url)) = TRIM(LOWER('{row['url']}'))",
                f"iteration = {int(row['iteration'])}",
                f"TRIM(LOWER(version)) = TRIM(LOWER('{row['version']}'))",
                f"TRIM(LOWER(llm_provider)) = TRIM(LOWER('{row['llm_provider']}'))",
            ]
        )

        # Construct the SET clause for the columns to be updated
        set_clause = ", ".join([f"{col} = '{row[col]}'" for col in update_columns])

        # Diagnostic SELECT Query to Check Matching Rows
        select_query = f"""
            SELECT *
            FROM {table_name.value}
            WHERE {where_clause}
        """
        print(f"SELECT Query:\n{select_query}\n")
        try:
            result_df = con.execute(select_query).fetchdf()
            print(f"Query Result:\n{result_df}\n")
        except Exception as e:
            print(f"❌ Error executing SELECT query for row {row}: {e}")

        # Construct the final UPDATE SQL query
        update_query = f"""
            UPDATE {table_name.value}
            SET {set_clause}
            WHERE {where_clause}
        """

        # Execute the update
        print(f"Executing UPDATE Query:\n{update_query}\n")
        try:
            con.execute(update_query)
        except Exception as e:
            print(f"❌ Error executing UPDATE query for row {row}: {e}")

    print(f"✅ Processed {len(df)} rows in the '{table_name.value}' table.")
    con.close()


# Example usage
excel_file = DB_DIR / "records_to_update.xlsx"
table = TableName.PIPELINE_CONTROL
update_columns = ["status"]

update_table_from_excel(
    excel_path=excel_file, table_name=table, update_columns=update_columns
)

SELECT Query:

            SELECT *
            FROM pipeline_control
            WHERE TRIM(LOWER(url)) = TRIM(LOWER('https://boards.greenhouse.io/gleanwork/jobs/4425502005?source=LinkedIn')) AND iteration = 0 AND TRIM(LOWER(version)) = TRIM(LOWER('original')) AND TRIM(LOWER(llm_provider)) = TRIM(LOWER('openai'))
        

Query Result:
                                                 url  iteration stage  \
0  https://boards.greenhouse.io/gleanwork/jobs/44...          0  None   

                                         source_file           timestamp  \
0  /home/xzhang/dev/job_bot/input_output/input/jo... 2025-05-08 14:25:53   

    version llm_provider     status last_stage  is_active notes  
0  original       openai  completed       None       True        

Executing UPDATE Query:

            UPDATE pipeline_control
            SET status = 'completed'
            WHERE TRIM(LOWER(url)) = TRIM(LOWER('https://boards.greenhouse.io/gleanwork/jobs/4425502005?source=LinkedIn')) AND it

in cli alone:<br><br>

UPDATE pipeline_control AS pc
SET status = ud.status
FROM update_data AS ud
WHERE pc.url = ud.url
  AND pc.iteration = ud.iteration
  AND pc.version = ud.version
  AND pc.llm_provider = ud.llm_provider;
