In [9]:
from postgresql1 import host, database, port, username, password
import sqlalchemy
import duckdb
import pandas as pd

# Connect to DuckDB
con = duckdb.connect()

# Create SQLAlchemy connection string for PostgreSQL
postgres_url = f"postgresql://{username}:{password}@{host}:{port}/{database}"
engine1 = sqlalchemy.create_engine(postgres_url)

# Query data from PostgreSQL using pandas
employees_df = pd.read_sql("SELECT * FROM employees", engine1)
print(employees_df.head(1))

# Step 3: Get schema information from PostgreSQL
schema_query = """
SELECT 
    column_name,
    data_type as column_type
FROM information_schema.columns 
WHERE table_name = 'employees' 
    AND table_schema = 'public'
ORDER BY ordinal_position
"""
schema_df = pd.read_sql(schema_query, engine1)
print(f"\n📋 Found {len(schema_df)} columns in the employees table")

# Step 4: Generate column-specific DQC SQL queries for PostgreSQL
def generate_dqc_sql(col_name, col_type, table_name='employees'):
    col_quoted = f'"{col_name.replace("\"", "\"\"")}"'  # Escape double quotes
    is_varchar = col_type.upper() in ['VARCHAR', 'STRING', 'TEXT', 'CHARACTER VARYING']
    
    sql = f"""
    SELECT 
        '{col_name}' AS column_name,
        '{col_type}' AS column_type,
        COUNT(*) AS total_rows,
        SUM(CASE WHEN {col_quoted} IS NULL THEN 1 ELSE 0 END) AS null_count,
        COUNT(DISTINCT {col_quoted}) AS distinct_count
    """
    
    # Include empty string count for string columns
    if is_varchar:
        sql += f""",
        SUM(CASE WHEN {col_quoted} = '' THEN 1 ELSE 0 END) AS empty_varchar_count"""
    
    # Add example value - using LIMIT instead of FILTER for PostgreSQL compatibility
    if is_varchar:
        sql += f""",
        (SELECT {col_quoted} FROM {table_name} 
         WHERE {col_quoted} IS NOT NULL AND {col_quoted} != '' 
         LIMIT 1) AS example_value"""
    else:
        sql += f""",
        (SELECT {col_quoted} FROM {table_name} 
         WHERE {col_quoted} IS NOT NULL 
         LIMIT 1) AS example_value"""
    
    # Add min/max for numeric or date-like types
    if col_type.upper() in ["INTEGER", "BIGINT", "SMALLINT", "NUMERIC", "DECIMAL", "REAL", "DOUBLE PRECISION", "TIMESTAMP", "DATE", "TIME"]:
        sql += f""",
        MIN({col_quoted}) AS min_val,
        MAX({col_quoted}) AS max_val"""
    
    sql += f"\nFROM {table_name}"
    return sql.strip()

# Step 5: Generate and run queries
queries = [
    (row["column_name"], row["column_type"], generate_dqc_sql(row["column_name"], row["column_type"]))
    for _, row in schema_df.iterrows()  # Fixed syntax error
]

def run_query_safe(col_name, col_type, sql):
    """Run DQC query safely with error handling"""
    try:
        # Execute query directly on PostgreSQL
        result_df = pd.read_sql(sql, engine1)
        return result_df
    except Exception as e:
        print(f"❌ Error on column {col_name}: {e}")
        return pd.DataFrame([{
            'column_name': col_name,
            'column_type': col_type,
            'total_rows': 0,
            'null_count': 0,
            'distinct_count': 0,
            'error': str(e)
        }])

print(f"\n🚀 Running DQC on {len(queries)} columns...")
results = [run_query_safe(col, dtype, sql) for col, dtype, sql in queries]

# Step 6: Combine all results
dqc_results_df = pd.concat(results, ignore_index=True)

# Step 7: Show the DQC summary
print("\n📊 === Data Quality Summary ===")
print(dqc_results_df)

# Additional summary statistics
if 'error' not in dqc_results_df.columns or dqc_results_df['error'].isna().all():
    print("\n📈 === Additional Insights ===")
    total_rows = dqc_results_df['total_rows'].iloc[0] if len(dqc_results_df) > 0 else 0
    print(f"Total rows in table: {total_rows:,}")
    
    if total_rows > 0:
        # Null percentage by column
        dqc_results_df['null_percentage'] = (dqc_results_df['null_count'] / dqc_results_df['total_rows'] * 100).round(2)
        
        # Uniqueness ratio
        dqc_results_df['uniqueness_ratio'] = (dqc_results_df['distinct_count'] / dqc_results_df['total_rows']).round(4)
        
        print(f"\nColumns with highest null percentages:")
        high_nulls = dqc_results_df.nlargest(5, 'null_percentage')[['column_name', 'null_percentage']]
        for _, row in high_nulls.iterrows():
            print(f"  • {row['column_name']}: {row['null_percentage']}%")
        
        print(f"\nColumns with low uniqueness (potential duplicates):")
        low_unique = dqc_results_df[dqc_results_df['uniqueness_ratio'] < 0.95].nsmallest(5, 'uniqueness_ratio')[['column_name', 'uniqueness_ratio']]
        for _, row in low_unique.iterrows():
            print(f"  • {row['column_name']}: {row['uniqueness_ratio']} ratio")

# Clean up connections
engine1.dispose()
con.close()

print("\n✅ Data Quality Check completed!")

   id  name  department_id  first_salary   hire_date
0   1  John              1         60000  2016-03-15

📋 Found 5 columns in the employees table

🚀 Running DQC on 5 columns...

📊 === Data Quality Summary ===
     column_name        column_type  total_rows  null_count  distinct_count  \
0             id            integer          50           0              50   
1           name  character varying          50           0              46   
2  department_id            integer          50           0              10   
3   first_salary            integer          50           0              21   
4      hire_date  character varying          50           0              50   

  example_value  min_val  max_val  empty_varchar_count  
0             1      1.0     50.0                  NaN  
1          John      NaN      NaN                  0.0  
2             1      1.0     10.0                  NaN  
3         60000  55000.0  79000.0                  NaN  
4    2016-03-15      NaN     