In [52]:
from langchain_openai import AzureChatOpenAI
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
import json
import threading
import concurrent.futures
from datetime import datetime

import warnings
warnings.filterwarnings("ignore")
load_dotenv(override=True)

True

In [53]:
required_vars = {
    "AZURE_OPENAI_ENDPOINT": os.environ.get("AZURE_OPENAI_ENDPOINT"),
    "AZURE_OPENAI_4o_DEPLOYMENT_NAME": os.environ.get("AZURE_OPENAI_4o_DEPLOYMENT_NAME"),
    "AZURE_OPENAI_API_VERSION": os.environ.get("AZURE_OPENAI_API_VERSION"),
    "AZURE_OPENAI_API_KEY": os.environ.get("AZURE_OPENAI_API_KEY"),
    "SNOWFLAKE_USER": os.environ.get("SNOWFLAKE_USER"),
    "SNOWFLAKE_PASSWORD": os.environ.get("SNOWFLAKE_PASSWORD"),
    "SNOWFLAKE_ACCOUNT": os.environ.get("SNOWFLAKE_ACCOUNT"),
    "SNOWFLAKE_WAREHOUSE": os.environ.get("SNOWFLAKE_WAREHOUSE"),
    "SNOWFLAKE_DATABASE": os.environ.get("SNOWFLAKE_DATABASE"),
    "SNOWFLAKE_SCHEMA": os.environ.get("SNOWFLAKE_SCHEMA")
}

print(required_vars["SNOWFLAKE_SCHEMA"])

TEST2


In [54]:
connection_string = (
    f"snowflake://{required_vars['SNOWFLAKE_USER']}:"
    f"{required_vars['SNOWFLAKE_PASSWORD']}@"
    f"{required_vars['SNOWFLAKE_ACCOUNT']}/"
    f"{required_vars['SNOWFLAKE_DATABASE']}/"
    f"{required_vars['SNOWFLAKE_SCHEMA']}?warehouse="
    f"{required_vars['SNOWFLAKE_WAREHOUSE']}"
)

engine = create_engine(connection_string)
print("Connected to Snowflake")

Connected to Snowflake


In [55]:
query = f"""
    SELECT 
        c.TABLE_NAME, c.COLUMN_NAME, c.DATA_TYPE, c.IS_NULLABLE, c.CHARACTER_MAXIMUM_LENGTH
    FROM {required_vars['SNOWFLAKE_DATABASE']}.INFORMATION_SCHEMA.COLUMNS c
    JOIN {required_vars['SNOWFLAKE_DATABASE']}.INFORMATION_SCHEMA.TABLES t 
        ON c.TABLE_NAME = t.TABLE_NAME
    WHERE t.TABLE_TYPE = 'BASE TABLE' 
    AND c.TABLE_SCHEMA = '{required_vars['SNOWFLAKE_SCHEMA']}'
"""

conn = engine.connect()
metadata = pd.read_sql(query, conn.connection)
metadata.columns = [col.lower() for col in metadata.columns]

print("\nAvailable tables:", metadata['table_name'].unique())
print(metadata.to_string)


Available tables: ['PATIENT_ADMISSIONS']
<bound method DataFrame.to_string of             table_name         column_name data_type is_nullable  \
0   PATIENT_ADMISSIONS          MEDICATION      TEXT         YES   
1   PATIENT_ADMISSIONS          MEDICATION      TEXT         YES   
2   PATIENT_ADMISSIONS          MEDICATION      TEXT         YES   
3   PATIENT_ADMISSIONS      BILLING_AMOUNT    NUMBER         YES   
4   PATIENT_ADMISSIONS      BILLING_AMOUNT    NUMBER         YES   
5   PATIENT_ADMISSIONS      BILLING_AMOUNT    NUMBER         YES   
6   PATIENT_ADMISSIONS              GENDER      TEXT         YES   
7   PATIENT_ADMISSIONS              GENDER      TEXT         YES   
8   PATIENT_ADMISSIONS              GENDER      TEXT         YES   
9   PATIENT_ADMISSIONS                NAME      TEXT          NO   
10  PATIENT_ADMISSIONS                NAME      TEXT          NO   
11  PATIENT_ADMISSIONS                NAME      TEXT          NO   
12  PATIENT_ADMISSIONS   MEDICAL_COND

In [56]:
for i in metadata['table_name'].unique():
    print(i)

PATIENT_ADMISSIONS


In [57]:
table_name = metadata['table_name'].unique()[0]  # Get first table only

print(f"\nRetrieving data from table: {table_name}")

query = f"SELECT * FROM {required_vars['SNOWFLAKE_DATABASE']}.{required_vars['SNOWFLAKE_SCHEMA']}.{table_name}"
conn = engine.connect()
df = pd.read_sql(query, conn.connection)
print(f"Retrieved {len(df)} rows")

df.head(2)


Retrieving data from table: PATIENT_ADMISSIONS
Retrieved 55514 rows


Unnamed: 0,NAME,AGE,GENDER,BLOOD_TYPE,MEDICAL_CONDITION,DATE_OF_ADMISSION,DOCTOR,HOSPITAL,INSURANCE_PROVIDER,BILLING_AMOUNT,ROOM_NUMBER,ADMISSION_TYPE,DISCHARGE_DATE,MEDICATION,TEST_RESULTS
0,test_5@@@###,55,M,O+,Diabetes,2025-02-21,Dr. Sm1th_!!,C!ty H0sp!tal,HealthCare Inc.,2000.5,305,!!!@##ER_ADMISSION###,2025-02-27,Metformin,Stable
1,test_6_error_case,60,F,B-,Chronic pain1234,2025-02-22,Dr. John Doe,City Hospital,No Coverage!!,1800.0,401,0utp@tient123,2025-02-28,Ibuprofen-500MG!,Unkn0wn??


In [58]:
model = AzureChatOpenAI(
    azure_endpoint=required_vars["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=required_vars["AZURE_OPENAI_4o_DEPLOYMENT_NAME"],
    openai_api_version=required_vars["AZURE_OPENAI_API_VERSION"],
    openai_api_key=required_vars["AZURE_OPENAI_API_KEY"],
)

In [59]:
metadata['table_name'].unique()

array(['PATIENT_ADMISSIONS'], dtype=object)

## passing chunks of 1500 rows to AI model in parallel generate DQ rules

In [60]:
table_name

'PATIENT_ADMISSIONS'

## Analysis with Multithreading for large datasets

In [61]:
print(analysis_results)

[]


In [62]:

# # Configuration
# chunk_size = 1500
# max_workers = 4  # Adjust based on your system capabilities
# analysis_results = []
# results_lock = threading.Lock()  # Lock for thread-safe access to shared results


# # Create a thread pool
# with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
#     futures = []
#     offset = 0
#     chunk_num = 1
    
#     # Keep fetching chunks until no more data
#     while True:
#         # Fetch data chunk
#         query = f"""
#         SELECT *
#         FROM {table_name}
#         LIMIT {chunk_size}
#         OFFSET {offset}
#         """
#         chunk = pd.read_sql(query, conn)
        
#         if chunk.empty:
#             break  # Stop if no more data

#         data_sample = df.head(10).to_string()

#         # Define task function inside the loop to capture chunk data
#         def process_chunk(chunk_num, chunk):
#             print(f"Analyzing chunk {chunk_num}...")

#             # Data Quality Analysis Prompt
#             dq_rule_prompt = f"""
#             # Data Quality Rule Generation Protocol
#             Table: {table_name}
            
#             Data Sample:
#             {chunk.to_string()}
            
#             ## Technical Requirements
#             You are an enterprise data quality engine generating precise data quality rules based on the provided data sample. The output must:

#             1. Identify specific data quality issues and anomalies
#             2. Provide technical validations to address identified issues
#             3. Specify compliance concerns and appropriate masking techniques
#             4. Prioritize rules based on severity and business impact
#             5. Include SQL-based validation expressions

#             ## Expected JSON Output Format
#             Return a JSON object with the following structure:

#             {{
#                 "data_quality_rules": [
#                     {{
#                         "rule_name": "DQR_{{TABLE}}_{{RULE_TYPE}}_{{##}}",
#                         "rule_type": "One of [NOT_NULL, UNIQUENESS, RANGE, PATTERN, REFERENTIAL, CUSTOM]",
#                         "affected_columns": ["column_names"],
#                         "validation_expression": "SQL expression or technical rule implementation",
#                         "severity": "One of [HIGH, MEDIUM, LOW]",
#                         "implementation": {{
#                             "phase": "One of [IMMEDIATE, SHORT_TERM, LONG_TERM]",
#                             "complexity": "One of [LOW, MEDIUM, HIGH]",
#                             "validation_sql": "Technical validation SQL to verify rule implementation"
#                         }}
#                     }}
#                 ],
#                     "compliance_rules": [
#                         {{
#                             "column": "column_name",
#                             "compliance_standard": ["applicable standards like PII, PHI, PCI, HIPAA, GDPR, SOC2"],
#                             "masking_technique": "suggested technique",
#                             "severity": "One of [HIGH, MEDIUM, LOW]",
#                             "validation_sql": "SQL to identify compliance violations"
#                             }}
#                         ],
#                         "anomaly_detection_rules": [
#                             {{
#                                 "description": "Technical description of identified anomaly",
#                                 "affected_columns": ["column_names"],
#                                 "detection_expression": "SQL or logic to detect this anomaly",
#                                 "severity": "One of [HIGH, MEDIUM, LOW]",
#                                 "recommended_action": "Specific technical remediation approach"
#                             }}
#                         ]
#                     }}

#                     ## Analysis Guidelines
#                     1. Evaluate completeness, accuracy, consistency, validity, timeliness, uniqueness, and integrity
#                     2. Identify industry-specific compliance violations
#                     3. Detect statistical outliers and distribution irregularities
#                     4. Assess pattern inconsistencies or format violations
#                     5. Evaluate semantic data quality issues beyond basic metrics
#                     6. Consider potential data governance or stewardship concerns

#                     ## Technical Response Requirements
#                     - Ensure each rule has a unique identifier following the naming convention
#                     - For every identified issue, provide a specific SQL validation query
#                     - Include severity ratings based on business impact
#                     - Specify clear implementation phases and complexity
#                     - Provide concrete technical recommendations
#                     """

#             # System prompt for model
#             system_prompt_dq_rule = """You are a specialized data quality analyst expert in Snowflake databases.
#             Analyze the provided data sample focusing on suggesting data quality rules and patterns.
#             Keep responses focused and brief. Ensure JSON format."""

#             messages = [
#                 {"role": "system", "content": system_prompt_dq_rule},
#                 {"role": "user", "content": dq_rule_prompt}
#             ]

#             # Generate analysis
#             response = model.invoke(messages).content.replace("plaintext", "").replace("json", "").replace("```", "").strip()

#             try:
#                 json_data = json.loads(response)  # Ensure valid JSON
#             except json.JSONDecodeError as e:
#                 print(f"Error parsing JSON response: {e}")
#                 print("Original response:", response)  # Log raw response
#                 return  # Skip further processing if JSON parsing fails

#             # Lock to prevent multiple threads from writing at the same time
#             # Lock to prevent multiple threads from writing at the same time
#             with results_lock:
#                 # Check if the file exists, if not create it first
#                 file_path = 'full_chunk_data_quality_rules.xlsx'
#                 file_exists = os.path.isfile(file_path)
                
#                 # Define the writer mode based on file existence
#                 mode = 'a' if file_exists else 'w'
                
#                 # Use the appropriate if_sheet_exists parameter only if file exists
#                 with pd.ExcelWriter(file_path, mode=mode, 
#                                     engine='openpyxl',
#                                     if_sheet_exists='overlay' if file_exists else None) as writer:
#                     if 'data_quality_rules' in json_data:
#                         dq_rules_df = pd.DataFrame(json_data['data_quality_rules'])
#                         sheet_name = 'Data Quality Rules'
#                         dq_rules_df.to_excel(writer, sheet_name=sheet_name, index=False)

#                     if 'compliance_rules' in json_data:
#                         compliance_df = pd.DataFrame(json_data['compliance_rules'])
#                         sheet_name = 'Compliance Rules'
#                         compliance_df.to_excel(writer, sheet_name=sheet_name, index=False)

#                     if 'anomaly_detection_rules' in json_data:
#                         anomaly_df = pd.DataFrame(json_data['anomaly_detection_rules'])
#                         sheet_name = 'Anomaly Rules'
#                         anomaly_df.to_excel(writer, sheet_name=sheet_name, index=False)

#                 print(f"Chunk {chunk_num} data successfully saved to '{file_path}'")

#         # Submit the task to the thread pool
#         future = executor.submit(process_chunk, chunk_num, chunk)
#         futures.append(future)

#         # Move to next chunk
#         offset += chunk_size
#         chunk_num += 1

#     # Ensure all futures complete
#     concurrent.futures.wait(futures)


In [None]:
## save all analysis in excel :

# Configuration
chunk_size = 1500
max_workers = 4  # Adjust based on your system capabilities
analysis_results = {
    'data_quality_rules': [],
    'compliance_rules': [],
    'anomaly_detection_rules': []
}
results_lock = threading.Lock()  # Lock for thread-safe access to shared results


# Create a thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    offset = 0
    chunk_num = 1
    
    # Keep fetching chunks until no more data
    while True:
        # Fetch data chunk
        query = f"""
        SELECT *
        FROM {table_name}
        LIMIT {chunk_size}
        OFFSET {offset}
        """
        conn = engine.connect()
        chunk = pd.read_sql(query, conn.connection)
        
        if chunk.empty:
            break  # Stop if no more data

        # Define task function inside the loop to capture chunk data
        def process_chunk(chunk_num, chunk):
            print(f"Analyzing chunk {chunk_num}...")

            # Data Quality Analysis Prompt
            dq_rule_prompt = f"""
            # Data Quality Rule Generation Protocol
            Table: {table_name}
            
            Data Sample:
            {chunk.to_string()}
            
            ## Technical Requirements
            You are an enterprise data quality engine generating precise data quality rules based on the provided data sample. The output must:

            1. Identify specific data quality issues and anomalies
            2. Provide technical validations to address identified issues
            3. Specify compliance concerns and appropriate masking techniques
            4. Prioritize rules based on severity and business impact
            5. Include SQL-based validation expressions

            ## Expected JSON Output Format
            Return a JSON object with the following structure:

            {{
                "data_quality_rules": [
                    {{
                        "rule_name": "DQR_{{TABLE}}_{{RULE_TYPE}}_{{##}}",
                        "rule_type": "One of [NOT_NULL, UNIQUENESS, RANGE, PATTERN, REFERENTIAL, CUSTOM]",
                        "affected_columns": ["column_names"],
                        "validation_expression": "SQL expression or technical rule implementation",
                        "severity": "One of [HIGH, MEDIUM, LOW]",
                        "implementation": {{
                            "phase": "One of [IMMEDIATE, SHORT_TERM, LONG_TERM]",
                            "complexity": "One of [LOW, MEDIUM, HIGH]",
                            "validation_sql": "Technical validation SQL to verify rule implementation"
                        }}
                    }}
                ],
                    "compliance_rules": [
                        {{
                            "column": "column_name",
                            "compliance_standard": ["applicable standards like PII, PHI, PCI, HIPAA, GDPR, SOC2"],
                            "masking_technique": "suggested technique",
                            "severity": "One of [HIGH, MEDIUM, LOW]",
                            "validation_sql": "SQL to identify compliance violations"
                            }}
                        ],
                        "anomaly_detection_rules": [
                            {{
                                "description": "Technical description of identified anomaly",
                                "affected_columns": ["column_names"],
                                "detection_expression": "SQL or logic to detect this anomaly",
                                "severity": "One of [HIGH, MEDIUM, LOW]",
                                "recommended_action": "Specific technical remediation approach"
                            }}
                        ]
                    }}

                    ## Analysis Guidelines
                    1. Evaluate completeness, accuracy, consistency, validity, timeliness, uniqueness, and integrity
                    2. Identify industry-specific compliance violations
                    3. Detect statistical outliers and distribution irregularities
                    4. Assess pattern inconsistencies or format violations
                    5. Evaluate semantic data quality issues beyond basic metrics
                    6. Consider potential data governance or stewardship concerns

                    ## Technical Response Requirements
                    - Ensure each rule has a unique identifier following the naming convention
                    - For every identified issue, provide a specific SQL validation query
                    - Include severity ratings based on business impact
                    - Specify clear implementation phases and complexity
                    - Provide concrete technical recommendations
                    """

            # System prompt for model
            system_prompt_dq_rule = """You are a specialized data quality analyst expert in Snowflake databases.
            Analyze the provided data sample focusing on suggesting data quality rules and patterns.
            Keep responses focused and brief. Ensure JSON format."""

            messages = [
                {"role": "system", "content": system_prompt_dq_rule},
                {"role": "user", "content": dq_rule_prompt}
            ]

            # Generate analysis
            response = model.invoke(messages).content.replace("plaintext", "").replace("json", "").replace("```", "").strip()

            try:
                json_data = json.loads(response)  # Ensure valid JSON
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON response for chunk {chunk_num}: {e}")
                print("Original response:", response)  # Log raw response
                return None  # Return None to indicate failure
            
            # Add chunk identifier to each rule for traceability
            for rule_list in json_data.values():
                for rule in rule_list:
                    rule['chunk_id'] = chunk_num
            
            # Return the data instead of writing directly
            return json_data

        # Submit the task to the thread pool
        future = executor.submit(process_chunk, chunk_num, chunk)
        futures.append(future)

        # Move to next chunk
        offset += chunk_size
        chunk_num += 1

    # Collect results from all futures
    for future in concurrent.futures.as_completed(futures):
        result = future.result()
        if result is not None:
            # Safely update the shared results dictionary
            with results_lock:
                for key in result:
                    if key in analysis_results:
                        analysis_results[key].extend(result[key])

# After all threads complete, write the combined results to Excel
file_path = 'full_chunk_data_quality_rules.xlsx'
with pd.ExcelWriter(file_path, engine='openpyxl') as writer:
    if analysis_results['data_quality_rules']:
        dq_rules_df = pd.DataFrame(analysis_results['data_quality_rules'])
        dq_rules_df.to_excel(writer, sheet_name='Data Quality Rules', index=False)
        print(f"Saved {len(dq_rules_df)} data quality rules")

    if analysis_results['compliance_rules']:
        compliance_df = pd.DataFrame(analysis_results['compliance_rules'])
        compliance_df.to_excel(writer, sheet_name='Compliance Rules', index=False)
        print(f"Saved {len(compliance_df)} compliance rules")

    if analysis_results['anomaly_detection_rules']:
        anomaly_df = pd.DataFrame(analysis_results['anomaly_detection_rules'])
        anomaly_df.to_excel(writer, sheet_name='Anomaly Rules', index=False)
        print(f"Saved {len(anomaly_df)} anomaly detection rules")

print(f"All analysis results successfully saved to '{file_path}'")


Analyzing chunk 1...
Analyzing chunk 2...
Analyzing chunk 3...
Analyzing chunk 4...
Analyzing chunk 5...
Analyzing chunk 6...
Analyzing chunk 7...
Analyzing chunk 8...
Analyzing chunk 9...
Analyzing chunk 10...
Analyzing chunk 11...
Analyzing chunk 12...
Analyzing chunk 13...
Analyzing chunk 14...
Analyzing chunk 15...
Analyzing chunk 16...
Analyzing chunk 17...
Analyzing chunk 18...
Analyzing chunk 19...
Analyzing chunk 20...
Analyzing chunk 21...
Analyzing chunk 22...
Analyzing chunk 23...
Analyzing chunk 24...
Analyzing chunk 25...
Analyzing chunk 26...
Analyzing chunk 27...
Analyzing chunk 28...
Analyzing chunk 29...
Analyzing chunk 30...
Analyzing chunk 31...
Analyzing chunk 32...
Analyzing chunk 33...
Analyzing chunk 34...
Analyzing chunk 35...
Analyzing chunk 36...
Analyzing chunk 37...
Analyzing chunk 38...
Saved 155 data quality rules
Saved 68 compliance rules
Saved 67 anomaly detection rules
All analysis results successfully saved to 'full_chunk_data_quality_rules.xlsx'
