In [11]:
import great_expectations as ge
import shutil
import pandas as pd
from datetime import datetime
import psycopg2

# Read the CSV file into a Great Expectations dataset
dataset = ge.read_csv("heart_disease.csv")
master = dataset.copy()

In [12]:
import psycopg2

try:
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(
    dbname='project',
    user='postgres',
    password='test',
    host='localhost',
    port='5432'
    )

    # Database connection successful
    print("Connected to the database!")

    # Close the database connection
    conn.close()

except psycopg2.Error as e:
    # Database connection failed
    print("Error connecting to the database:", e)


Connected to the database!


In [13]:
# Define the expectations
dataset.expect_column_values_to_be_of_type("Age", "int64")
dataset.expect_column_values_to_be_between("Cholesterol", min_value=0, max_value=500)
dataset.expect_column_values_to_be_between("Age", min_value=0, max_value=70)
dataset.expect_column_values_to_be_between("Number of vessels fluro", min_value=0, max_value=3)
# Validate the data

results = dataset.validate()





# Check if any data quality issues were found
if results["statistics"]["successful_expectations"] == results["statistics"]["evaluated_expectations"]:
    # No data quality issues found, move the file to folder C
    shutil.move("heart_disease.csv", "folder_C/test.csv")
else:
    # Get the validation statistics
    statistics = results["statistics"]

    # Get the details of the data quality issues
    data_quality_issues = results["results"]
    
    conn = psycopg2.connect(
    dbname='project',
    user='postgres',
    password='test',
    host='localhost',
    port='5432'
)
cursor = conn.cursor()
# Create a new table to store the data quality issues
#cursor.execute("CREATE TABLE data_quality_issues (id serial PRIMARY KEY, column_name text, expectation text, unexpected_value text)")
#conn.commit()

    # Insert the data quality issues into the table
for issue in data_quality_issues:
        if not issue["success"]:
            column = issue["expectation_config"]["kwargs"]["column"]
            expectation = issue["expectation_config"]["expectation_type"]
            unexpected_values = ', '.join(str(v) for v in issue["result"]["partial_unexpected_list"])
            cursor.execute("INSERT INTO data_quality_issues (column_name, expectation, unexpected_value) VALUES (%s, %s, %s)", (column, expectation, unexpected_values))
            conn.commit()

    # Close the database connection
cursor.close()
conn.close()

    # Create a dataframe for invalid rows
invalid_rows_data = pd.DataFrame(columns=dataset.columns)

    # Check if each expectation is unsatisfied and add the corresponding rows to the invalid_rows_data dataframe
for expectation_result in data_quality_issues:
        if not expectation_result["success"]:
            column = expectation_result["expectation_config"]["kwargs"]["column"]
            partial_unexpected_list = expectation_result["result"]["partial_unexpected_list"]
            invalid_rows_data = invalid_rows_data.append(dataset[dataset[column].isin(partial_unexpected_list)])

if invalid_rows_data.shape[0] == dataset.shape[0]:
        # All rows have data problems, move the file to folder B
        shutil.move("heart_disease.csv", "folder_B/test.csv")
else:
        # Reset the indices of the dataset
        dataset.reset_index(drop=True, inplace=True)

        # Drop the invalid rows from the main dataset
        dataset.drop(invalid_rows_data.index, inplace=True)

        # Move the invalid rows to folder B
        invalid_rows_data.to_csv("folder_B/invalid_data.csv", index=False)

        # Move the valid rows to folder C
        dataset.to_csv("folder_C/valid_data.csv", index=False)