In [12]:
import pandas as pd
import psycopg
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor, as_completed


@contextmanager
def get_connection(config):
    """
    Context manager to handle PostgreSQL connection lifecycle.
    Ensures that the connection is closed after use.

    :param config: Dictionary containing database connection parameters.
    """
    conn = None
    try:
        conn = psycopg.connect(
            dbname=config["dbname"],
            user=config["user"],
            password=config["password"],
            host=config["host"],
            port=config["port"],
        )
        yield conn
    except psycopg.Error as e:
        print(f"Error connecting to database {config['dbname']}: {e}")
        raise
    finally:
        if conn:
            conn.close()


def fetch_column_names_for_query(conn, query):
    """
    Fetch column names from a query result. If the table doesn't exist, handle gracefully.

    :param conn: A psycopg connection object.
    :param query: SQL query to analyze.
    :return: List of column names or None if the table does not exist.
    """
    with conn.cursor() as cursor:
        try:
            cursor.execute(f"SELECT * FROM ({query}) AS subquery LIMIT 0;")
            return [desc[0] for desc in cursor.description]
        except psycopg.errors.UndefinedTable as e:
            print(f"Table not found in database: {e}")
            return None
        except Exception as e:
            print(f"Error executing query to fetch column names: {e}")
            raise


def fetch_data_in_chunks(conn, query, chunk_size=10000, params=None):
    """
    Fetch query results in chunks to handle large datasets.

    :param conn: A psycopg connection object.
    :param query: SQL query to execute.
    :param chunk_size: Number of rows to fetch per chunk.
    :param params: Optional query parameters.
    :return: A generator that yields chunks of data.
    """
    with conn.cursor() as cursor:
        cursor.execute(query, params)
        while True:
            data = cursor.fetchmany(chunk_size)
            if not data:
                break
            yield data


def process_database(config, user_query, chunk_size=10000, params=None):
    """
    Process a single database by executing the query and fetching results.

    :param config: Database configuration dictionary.
    :param user_query: SQL query to execute.
    :param chunk_size: Number of rows to fetch per chunk.
    :param params: Optional query parameters.
    :return: List of DataFrames for the processed database.
    """
    dfs = []
    try:
        with get_connection(config) as conn:
            column_names = fetch_column_names_for_query(conn, user_query)
            if column_names is None:
                print(f"Skipping {config['dbname']} as table does not exist.")
                return []  # Skip this database if table doesn't exist

            # Fetch data in chunks
            for chunk in fetch_data_in_chunks(conn, user_query, chunk_size, params):
                if chunk:
                    if len(chunk[0]) != len(column_names):
                        print(f"Column mismatch in {config['dbname']}")
                        continue
                    df = pd.DataFrame(chunk, columns=column_names)
                    df["database"] = config["dbname"]  # Add database name as a column
                    dfs.append(df)
                else:
                    print(f"No data returned from {config['dbname']}")
    except Exception as e:
        print(f"Error processing database {config['dbname']}: {e}")

    return dfs


def run_query_with_dynamic_columns(
    db_configs, user_query, chunk_size=10000, params=None
):
    """
    Execute a user query across multiple databases concurrently and return the results as a pandas DataFrame.

    :param db_configs: List of database connection configurations.
    :param user_query: SQL query to execute.
    :param chunk_size: Number of rows to fetch per chunk.
    :param params: Optional query parameters.
    :return: Combined DataFrame with results from all databases.
    """
    all_dfs = []

    with ThreadPoolExecutor(max_workers=len(db_configs)) as executor:
        # Submit each database processing task to the executor
        future_to_db = {
            executor.submit(
                process_database, config, user_query, chunk_size, params
            ): config
            for config in db_configs
        }

        # Collect results as tasks complete
        for future in as_completed(future_to_db):
            db_config = future_to_db[future]
            try:
                result = future.result()
                if result:  # Only add results if the database returned data
                    all_dfs.extend(result)
            except Exception as e:
                print(f"Error processing {db_config['dbname']}: {e}")

    # Combine all DataFrames into one
    combined_df = pd.concat(all_dfs, ignore_index=True) if all_dfs else pd.DataFrame()

    if combined_df.empty:
        print("No data retrieved from the databases.")

    return combined_df


# Example usage
db_configs = [
    {
        "dbname": "Employees",
        "user": "postgres",
        "password": "root",
        "host": "localhost",
        "port": "5432",
    },
]
db_configs_2 = [
    {
        "dbname": "Store",
        "user": "postgres",
        "password": "root",
        "host": "localhost",
        "port": "5432",
    },
]

# Query for Employees database
user_query = "SELECT * FROM employees ORDER BY emp_no ASC"
params = None

# Query for Store database
user_query_2 = "SELECT * FROM customers"

# Fetch results for both queries concurrently
result_df = run_query_with_dynamic_columns(
    db_configs, user_query, chunk_size=10000, params=params
)
result_df_2 = run_query_with_dynamic_columns(
    db_configs_2, user_query_2, chunk_size=10000, params=params
)

# Print results
print(result_df)
print(result_df_2)

        emp_no  birth_date first_name last_name gender   hire_date   database
0        10001  1953-09-02     Georgi   Facello      M  1986-06-26  Employees
1        10002  1964-06-02    Bezalel    Simmel      F  1985-11-21  Employees
2        10003  1959-12-03      Parto   Bamford      M  1986-08-28  Employees
3        10004  1954-05-01  Chirstian   Koblick      M  1986-12-01  Employees
4        10005  1955-01-21    Kyoichi  Maliniak      M  1989-09-12  Employees
...        ...         ...        ...       ...    ...         ...        ...
300019  499995  1958-09-24     Dekang  Lichtner      F  1993-01-12  Employees
300020  499996  1953-03-07       Zito      Baaz      M  1990-09-27  Employees
300021  499997  1961-08-03    Berhard    Lenart      M  1986-04-21  Employees
300022  499998  1956-09-05   Patricia   Breugel      M  1993-10-13  Employees
300023  499999  1958-05-01     Sachin   Tsukuda      M  1997-11-30  Employees

[300024 rows x 7 columns]
       customerid firstname    lastna

In [25]:
import pandas as pd
import psycopg
from contextlib import contextmanager
from fuzzywuzzy import process
import logging
from typing import List, Dict, Optional, Tuple

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@contextmanager
def get_connection(config: Dict[str, str]) -> psycopg.Connection:
    """Context manager to handle PostgreSQL connection lifecycle."""
    conn = None
    try:
        conn = psycopg.connect(
            dbname=config["dbname"],
            user=config["user"],
            password=config["password"],
            host=config["host"],
            port=config["port"],
        )
        logger.info(f"Connected to database {config['dbname']} successfully.")
        yield conn
    except psycopg.OperationalError as e:
        logger.error(f"Error connecting to database {config['dbname']}: {e}")
        raise
    finally:
        if conn:
            conn.close()
            logger.info(f"Connection to database {config['dbname']} closed.")


def fetch_all_data(
    conn: psycopg.Connection, table_name: str
) -> Tuple[List[Tuple], List[str]]:
    """Fetch all data from the specified table."""
    query = f"SELECT * FROM {table_name};"
    with conn.cursor() as cursor:
        cursor.execute(query)
        return cursor.fetchall(), [desc[0] for desc in cursor.description]


def fetch_data_from_table(
    db_configs: List[Dict[str, str]], table_name: str
) -> pd.DataFrame:
    """Fetch all data from the specified table and return it as a DataFrame."""
    dfs = []

    for config in db_configs:
        try:
            with get_connection(config) as conn:
                data, column_names = fetch_all_data(conn, table_name)

                if data:
                    df = pd.DataFrame(data, columns=column_names)
                    df["database"] = config["dbname"]
                    dfs.append(df)
                else:
                    logger.warning(
                        f"No data returned from {config['dbname']} for table {table_name}."
                    )
        except Exception as e:
            logger.error(f"Error processing database {config['dbname']}: {e}")

    combined_df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
    return combined_df


def fuzzy_search(
    df: pd.DataFrame, search_term: str, threshold: int = 80
) -> Dict[str, List[str]]:
    """Perform fuzzy search on the DataFrame based on the search term."""
    results = {}
    search_terms = search_term.split()

    for column in df.columns:
        if df[column].dtype == object:  # Check if column type is string
            matches = []
            unique_values = df[column].dropna().unique()  # Handle NaN values
            for term in search_terms:
                matched_items = process.extract(term, unique_values, limit=None)
                filtered_matches = [
                    match for match, score in matched_items if score > threshold
                ]
                matches.extend(filtered_matches)

            matches = list(
                dict.fromkeys(matches)
            )  # Remove duplicates while preserving order

            if matches:
                results[column] = matches

    return results


# Example usage
db_configs = [
    {
        "dbname": "Employees",
        "user": "postgres",
        "password": "root",
        "host": "localhost",
        "port": "5432",
    }
]

# Fetch all data from the specified table
table_name = "employees"
data_df = fetch_data_from_table(db_configs, table_name)

# Print the DataFrame
logger.info("All Data from Employees Table:")
logger.info(data_df)

# Perform a fuzzy search
search_term = "1953 geor cello"  # Example search term
fuzzy_results = fuzzy_search(data_df, search_term)

# Print the fuzzy search results
logger.info(f"\nFuzzy Search Results for '{search_term}':")
for column, matches in fuzzy_results.items():
    logger.info(f"In column '{column}': {matches}")


INFO:__main__:Connected to database Employees successfully.


INFO:__main__:Connection to database Employees closed.
INFO:__main__:All Data from Employees Table:
INFO:__main__:        emp_no  birth_date first_name last_name gender   hire_date   database
0        10001  1953-09-02     Georgi   Facello      M  1986-06-26  Employees
1        10002  1964-06-02    Bezalel    Simmel      F  1985-11-21  Employees
2        10003  1959-12-03      Parto   Bamford      M  1986-08-28  Employees
3        10004  1954-05-01  Chirstian   Koblick      M  1986-12-01  Employees
4        10005  1955-01-21    Kyoichi  Maliniak      M  1989-09-12  Employees
...        ...         ...        ...       ...    ...         ...        ...
300019  499995  1958-09-24     Dekang  Lichtner      F  1993-01-12  Employees
300020  499996  1953-03-07       Zito      Baaz      M  1990-09-27  Employees
300021  499997  1961-08-03    Berhard    Lenart      M  1986-04-21  Employees
300022  499998  1956-09-05   Patricia   Breugel      M  1993-10-13  Employees
300023  499999  1958-05-01  