In [None]:
import os
from ydata_profiling import ProfileReport
from ydata_profiling.config import Settings
import argparse
import pandas as pd
import hashlib
import json
import getpass
import oracledb
from clickhouse_driver import Client # Import the clickhouse driver
from dotenv import load_dotenv # Import load_dotenv to load environment variables from .env file
#from .autonotebook import tqdm as notebook_tqdm

In [None]:
'''
def connect_to_oracle(db_user: str, db_password: str, db_host: str, db_port, db_sid) -> None:
    try: 
        with oracledb.connect(user=db_user, password=db_password, dsn=oracledb.makedsn(db_host, db_port, db_sid)) as connection:
            with connection.cursor() as cursor:
                sql = """select sysdate from dual"""
                for r in cursor.execute(sql):
                    print(r)
    except oracledb.DatabaseError as e:
        error, = e.args
        print(f"Oracle error code: {error.code}")
        print(f"Oracle error message: {error.message}")
    return None
'''
''' --- Configuration Paths ---
Calculate the path to the secrets file relative to the script's directory
'''
# Calculate the path to the table list file relative to the project root
script_dir = os.path.dirname(__file__)
project_root = os.path.join(script_dir, '..')

tables_path= os.path.join(project_root, 'scripts', 'flex11_table_list.txt')

# Calculate the path to the output directory relative to the project root
metadata_dir= os.path.join(project_root, 'metadata_profile')

# Load environment variables from the specified secrets file
# We use override=True to ensure variables in this file take precedence if they exist elsewhere
secrets_path = os.path.join(project_root, 'env', 'secrets.env')

# Load environment variables from the .env file
# We use override=True to ensure variables in this file take precedence if they exist elsewhere
load_dotenv(dotenv_path=secrets_path, override=True)


def connect_to_clickhouse(user: str, password: str, host: str, port: int) -> None:
    '''
    Connect to Clickhouse database.

    Args:
        db_user (str): Database username
        db_password (str): Database password
        db_host (str): Database host
        db_port (int): Database port

    Returns:
        clickhouse_driver.Client: ClickHouse client connection object, or None if connection fails
    '''
    try:
        # Use the Client object for connection
        client = Client(
            host=host,
            port=port,
            user=user,
            password=password
        )
        # Test the connection by executing a simple query
        client.execute('SELECT 1')
        print(f"Successfully connected to ClickHouse at {host}:{port}")
        return client # Return the client object on success
    except Exception as e:
        print(f"Error connecting to ClickHouse: {e}")
        return None

def get_list_of_tables(path_to_file: str) -> list:
    """
    Get a list of tables from a file.
    Args:
        path_to_file (str): path to file containing table names
        
    Returns:
        list: List of table names
    """
    try:
        with open(path_to_file, 'r') as file:
            tables = [line.strip() for line in file.readlines()]
        return tables
    except FileNotFoundError:
        print(f"Error: File not found at the path '{path_to_file}'. Please check the path and try again.")
        return []
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return []

def hash_column(column: pd.Series) -> pd.Series:
    """Hash the values in a column using SHA-256."""
    return column.apply(lambda x: hashlib.sha256(str(x).encode()).hexdigest() if pd.notnull(x) else x)

def generate_profiling_report(db_connection=Client, tables_list=None, path_to_csv=None, 
                             sensitive_columns=None, sensitive_keywords=None) -> pd.DataFrame:
    """
    Generate a profiling report using ydata-profiling for Clickhouse database tables
    
    Args:
        db_connection (clickhouse_driver.Client, optional): Connection to Clickhouse database
        schema (str, optional): Schema name for clickhouse tables- multiple schema does not exist
        tables_list (list, optional): List of tables to profile
        sensitive_columns (list, optional): List of column names to mark as sensitive
        sensitive_keywords (list, optional): Keywords to detect sensitive columns
        
    Returns:
        DataFrame: DataFrame with profiling information including completeness metrics
    """
    # Default sensitive keywords if not provided
    if sensitive_keywords is None:
        sensitive_keywords = ["bvn", "id number", "nin", "passport", "driver", 
                             "identificationnumber", "chn"]
    
    results_dfs = []  # To store results from multiple tables
    
    try:
        if db_connection and tables_list:
            # Process ClickHouse tables
            for table_name in tables_list: # Iterate directly over table names
                try:
                    full_table_name = table_name
                    db_name = db_connection.database # CHANGED: Get the database name from the client object (will be default if not specified) -> default
                    
                    # Check if the table exists in the database
                    print(f"Processing table: {full_table_name}")

                    # Construct the SQL query for ClickHouse
                    # This query uses the table_name, relying on the default database connection
                    query = f"SELECT * FROM {table_name}" # CHANGED: Use table_name directly in query

                    # execute returns a list of tuples, need to get column names separately
                    data_tuples = db_connection.execute(query)

                    # Get column names from the table description using client.execute()
                    column_names = [col[0] for col in db_connection.execute(f"DESCRIBE TABLE {table_name}")] # CHANGED: Use table_name in DESCRIBE TABLE query

                    # Create pandas DataFrame
                    data = pd.DataFrame(data_tuples, columns=column_names)


                    # For empty tables
                    if data.empty:
                        print(f"Table {full_table_name} is empty. Skipping.")
                        continue

                    # Pass the database name obtained from the client as schema_name
                    result_df = _process_dataset(data, full_table_name, table_name, db_name, # CHANGED: Pass db_name
                                               sensitive_columns, sensitive_keywords)
                    results_dfs.append(result_df)

                except Exception as e:
                    print(f"Error processing table {full_table_name}: {e}")
        else: # This else block now covers cases where db_connection or tables_list are missing
            print("Error: db_connection and tables_list must be provided for database profiling.") # Updated error message
            return pd.DataFrame()

        # Combine all results
        if results_dfs:
            # Use ignore_index=True to reset index when concatenating
            combined_df = pd.concat(results_dfs, ignore_index=True)
            return combined_df
        else:
            return pd.DataFrame()

    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return pd.DataFrame()
    
def _process_dataset(data, source_name, table_name, schema_name, sensitive_columns, sensitive_keywords):
    """Helper function to process a DB table"""
    # Store the total record count
    total_records = len(data)
    
    # Automatically detect sensitive columns if not provided
    if sensitive_columns is None:
        sensitive_columns = [
            col for col in data.columns 
            if any(keyword in col.lower() for keyword in sensitive_keywords)
        ]
    
    # Hash sensitive columns
    if sensitive_columns:
        for col in sensitive_columns:
            if col in data.columns:
                print(f"Hashing sensitive column: {col}")
                data[col] = hash_column(data[col])
    
    # Configure settings to mark sensitive columns
    config = Settings()
    if sensitive_columns:
        config.variables.descriptions = {col: "Sensitive Data (Hashed)" 
                                       for col in sensitive_columns 
                                       if col in data.columns}
    
    # Generate profiling report    
    profile = ProfileReport(
        data,
        title=f"{source_name} Profiling Report",
        explorative=True,
        config=config)
    
    # Get JSON data and extract variables data
    json_data = profile.to_json()
    variables_data = json.loads(json_data)['variables']
    variables_df = pd.DataFrame(variables_data).transpose()
    variables_df = variables_df.reset_index().rename(columns={'index': 'column_name'})
    
    # Add metadata enrichment
    variables_df['table_name'] = table_name
    variables_df['schema_name'] = schema_name # this will be the "default" database name
    variables_df['total_records'] = total_records
    variables_df['created_at'] = pd.Timestamp.now()
    variables_df['last_updated'] = pd.Timestamp.now()
    
    # Calculate completeness percentage
    if 'count' in variables_df.columns and 'n_missing' in variables_df.columns:
        variables_df['completeness_pct'] = ((variables_df['count'] - variables_df['n_missing']) / 
                                         variables_df['count'] * 100).round(2)
    
    # Mark sensitive columns in the metadata
    variables_df['is_sensitive'] = variables_df['column_name'].isin(sensitive_columns)
    
    print(f"Successfully generated profile for {source_name} with {len(variables_df)} columns")
    
    return variables_df

def generate_metadata_file(var_df: pd.DataFrame, output_path: str = None) -> pd.DataFrame:
    """
    Generate a metadata file from the profiling report.
    If the file exists, checks for duplicates and appends only new records.
    
    Args:
        var_df (DataFrame): DataFrame with profiling information
        output_path (str, optional): Path to save the metadata file
    
    Returns:
        DataFrame: The processed metadata DataFrame that was saved
    """
    try:
        # Create a copy to avoid modifying the original DataFrame
        metadata_df = var_df.copy()
        
        # Set default filename based on schema and table if available
       
        # Determine the filename based on table name
        # Ensure metadata_df is not empty and has the required columns
        if metadata_df.empty or 'table_name' not in metadata_df.columns:
             print("Error: Metadata DataFrame is empty or missing required column ('table_name'). Cannot determine filename.")
             return pd.DataFrame() # Return empty DataFrame if essential info is missing

        # Use first row's table (assuming all rows are for same table for a single file)
        table = metadata_df['table_name'].iloc[0]
        if not table:
             print("Error: Table name is missing in the metadata. Cannot determine filename.")
             return pd.DataFrame()

        # Construct the full output file path within the output directory
        output_filename = f"{table}_metadata.csv"
        output_path = os.path.join(metadata_dir, output_filename) # CHANGED: Construct full path using output_dir

        
        # Rename columns if they have their original names
        rename_map = {
            'n_distinct': 'distinct_count',
            'p_distinct': 'distinct_percentage',
            'is_unique': 'is_unique',
            'type': 'data_type',
            'n_unique': 'unique_count',
            'p_unique': 'unique_percentage',
            'n_missing': 'missing_count',
            'n': 'total_count',
            'p_missing': 'missing_percentage',
            'n_category': 'category_count'
        }
        
        # Only rename columns that exist and haven't been renamed yet
        rename_cols = {k: v for k, v in rename_map.items() if k in metadata_df.columns and v not in metadata_df.columns}
        if rename_cols:
            metadata_df.rename(columns=rename_cols, inplace=True)
        
        # Ensure these columns are included at the beginning
        priority_cols = ['schema_name', 'table_name', 'column_name', 'data_type', 
                        'total_records', 'total_count', 'missing_count', 
                        'completeness_pct', 'is_sensitive',
                        'created_at', 'last_updated']
        
        # Create a list of all columns with priority columns first
        all_cols = []
        for col in priority_cols:
            if col in metadata_df.columns:
                all_cols.append(col)
                
        # Add remaining columns
        for col in metadata_df.columns:
            if col not in all_cols:
                all_cols.append(col)
                
        # Reorder columns
        metadata_df = metadata_df[all_cols]
        
        # Check if file exists and handle append logic
        if os.path.exists(output_path):
            # Read existing metadata
            existing_df = pd.read_csv(output_path)
            
            # Define key columns for identifying duplicates
            # A row is considered a duplicate if schema, table, and column name match
            key_columns = ['schema_name', 'table_name', 'column_name']
            key_columns = [col for col in key_columns if col in metadata_df.columns and col in existing_df.columns]
            
            if key_columns:  # Only proceed with duplicate check if we have key columns
                # Filter out rows that already exist in the file
                # Create a set of tuples with the key values from existing data
                existing_keys = set(
                    tuple(row) for row in existing_df[key_columns].itertuples(index=False, name=None)
                )
                
                # Filter new data to only include rows with new keys
                new_data_mask = ~metadata_df.apply(
                    lambda row: tuple(row[key_columns]) in existing_keys, axis=1
                )
                
                # If we have any new data, append it
                if new_data_mask.any():
                    metadata_df = metadata_df[new_data_mask]
                    # Append new data to existing file
                    metadata_df.to_csv(output_path, mode='a', header=False, index=False)
                    print(f"Appended {new_data_mask.sum()} new records to {output_path}")
                else:
                    print(f"No new records to append to {output_path}")
                    
                # Combine for return value
                metadata_df = pd.concat([existing_df, metadata_df])
            else:
                # If we can't determine duplicates, append all (might cause duplicates)
                metadata_df.to_csv(output_path, mode='a', header=False, index=False)
                print(f"Appended all records to {output_path} (duplicate checking unavailable)")
                
        else:
            # File doesn't exist, create new
            metadata_df.to_csv(output_path, index=False)
            print(f"Created new metadata file at {output_path}")
        
        return metadata_df
        
    except Exception as e:
        print(f"An error occurred while generating the metadata file: {e}")
        return var_df  # Return original DataFrame if we encounter an error
    

    # Example Usage:
if __name__ == "__main__":
    # --- ClickHouse Connection Details ---
    # Read credentials from environment variables loaded from the secrets file
    ch_host = os.getenv('host')
    ch_port = int(os.getenv('port'))
    ch_user = os.getenv('user')
    ch_password = os.getenv('password')

    # --- Paths to the file containing table names and the output directory ---
    # These are now defined above using os.path.join relative to the project root

    # --- Sensitive Column Configuration ---
    # Optional: List specific column names to mark as sensitive, overrides keyword detection
    # Read from env var, assuming comma-separated string
    # SPECIFIC_SENSITIVE_COLUMNS_STR = os.getenv('SPECIFIC_SENSITIVE_COLUMNS')
    # SPECIFIC_SENSITIVE_COLUMNS = [col.strip() for col in SPECIFIC_SENSITIVE_COLUMNS_STR.split(',')] if SPECIFIC_SENSITIVE_COLUMNS_STR else None

    # Optional: Keywords to detect sensitive columns automatically if SPECIFIC_SENSITIVE_COLUMNS is None
    sensitive_keywords_list = ["bvn", "id number", "nin", "passport", "driver",
                             "identificationnumber", "chn", "email", "phone"]


    # Validate essential configuration
    if not all([ch_host, ch_user, ch_password]):
        print("Error: Essential ClickHouse connection details (CH_HOST, CH_USER, CH_PASSWORD) not found in environment variables loaded from the secrets file.")
        print(f"CH_HOST: {ch_host}, CH_PORT: {ch_port}, CH_USER: {ch_user}")
        exit(1) # Exit if essential connection details are missing

    # Check if the calculated paths exist
    if not os.path.exists(tables_path):
        print(f"Error: Calculated table list file path does not exist: {tables_path}")
        exit(1)

    # The output directory will be created by generate_metadata_file, so no need to check existence here.
    # However, we should ensure METADATA_OUTPUT_DIR is not None or empty if it wasn't calculated correctly.
    if not os.path.exists(metadata_dir):
        print(f"Error: Calculated table list file path does not exist: {metadata_dir}")
        exit(1)


    # 2. Connect to ClickHouse
    ch_client = connect_to_clickhouse(ch_host, ch_port, ch_user, ch_password)

    if ch_client:
        # 3. Get list of tables from the file
        tables_to_profile = get_list_of_tables(tables_path) # Use the calculated path

        if tables_to_profile:
            # 4. Generate profiling report for ClickHouse tables
            profiling_results_df = generate_profiling_report(
                db_connection=ch_client,
                tables_list=tables_to_profile,
                # sensitive_columns=SPECIFIC_SENSITIVE_COLUMNS, # Use the list from env var or None
                sensitive_keywords=sensitive_keywords_list
            )

            # 5. Generate and save the metadata file to the specified directory
            if not profiling_results_df.empty:
                generate_metadata_file(profiling_results_df, output_dir=(metadata_dir)) # Use the calculated directory
            else:
                print("No profiling results to generate metadata file.")

        else:
            print("No tables found in the specified file to profile.")

        # Close the ClickHouse connection
        ch_client.close()
    else:
        print("Failed to connect to ClickHouse. Cannot proceed with profiling.")
