# Data preparation
* In this stage, data is explored, cleaned, and prepared for modeling.
* GX is used to create and apply data quality definitions to the input data.
* This demo uses the sample [Heart Disease dataset](https://archive.ics.uci.edu/dataset/45/heart+disease) available from the UCI ML Repository.

In [None]:
import altair as alt
import great_expectations as gx
import great_expectations.expectations as gxe
import pandas as pd
import pathlib

import demo_code as demo

## Collect data

In [None]:
DATA_DIR = pathlib.Path("/notebooks/data")

RAW_DATA_DIR = DATA_DIR / "raw"
CLEANED_DATA_DIR = DATA_DIR / "cleaned"

### Download raw patient data

Download heart disease data from [the Heart Disease dataset](https://archive.ics.uci.edu/dataset/45/heart+disease) in the UCI ML Repository.

In [None]:
demo.data.download_uci_heart_disease_data(RAW_DATA_DIR, force=False)

### Ingest raw patient data into dataframe

The columns and descriptions are defined below based on the data dictionary in the `heart_disease.names` contained in the downloaded data.

In [None]:
display(
    demo.data.display_heart_disease_data_dictionary().style.set_properties(
        **{"text-align": "left"}
    )
)

In [None]:
df_heart_disease = demo.data.load_uci_heart_disease_data(RAW_DATA_DIR)

### Display sample of raw patient data

In [None]:
display(df_heart_disease.head())
display(df_heart_disease.tail())

## Examine raw patient data using GX and Expectations

### Define GX data validation workflow

Get a GX Data Context.

In [None]:
context = gx.get_context(mode="ephemeral")

In [None]:
# Code to create a containerized Data Docs site for demo.
context.add_data_docs_site(
    site_config={
        "class_name": "SiteBuilder",
        "show_how_to_buttons": False,
        "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": "/gx/gx_volume/data_docs",
        },
        "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
    },
    site_name="GX in the ML pipeline demo",
)

Create the Data Source, Data Asset, Batch Definition, and the get Batch of data.

In [None]:
pandas_data_source = context.data_sources.add_pandas("pandas")
pandas_data_asset = pandas_data_source.add_dataframe_asset(
    name="Raw heart disease data"
)

pandas_batch_definition = pandas_data_asset.add_batch_definition_whole_dataframe(
    "batch definition"
)
batch = pandas_batch_definition.get_batch(
    batch_parameters={"dataframe": df_heart_disease}
)

Create the Expectation Suite.

In [None]:
suite = context.suites.add(
    gx.core.expectation_suite.ExpectationSuite(name="Heart disease data: exploratory")
)

Create and add Expectations to the Suite, based on the provided data definitions.

In [None]:
suite.add_expectation(
    gxe.ExpectColumnValuesToBeBetween(column="age", min_value=0, max_value=99)
)

suite.add_expectation(gxe.ExpectColumnValuesToBeInSet(column="sex", value_set=[0, 1]))

suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="cp", value_set=[1, 2, 3, 4])
)

suite.add_expectation(gxe.ExpectColumnValuesToBeOfType(column="trestbps", type_="int"))

Run Expectations against the data and capture the Validation Result.

In [None]:
validation_result = batch.validate(suite)

### Explore Validation Results

Running the Validation returns a `ExpectationSuiteValidationResult` object.

In [None]:
type(validation_result)

The `success` key indicates whether or not all Expectations passed.

In [None]:
validation_result["success"]

The `statistics` key contains summary information on how many Expectations passed.

In [None]:
validation_result["statistics"]

The `results` key provides a list of results for each Expectation that was run.

First, examine results for a passing Expectation.

In [None]:
validation_result["results"][0]

Next, examine results from a failing Expectation.

In [None]:
validation_result["results"][3]

Parse the Validation Result object to summarize result data by Expectation in the notebook.

In [None]:
expectation_results = []

for expectation_result in validation_result.describe_dict()["expectations"]:

    params = expectation_result["kwargs"].copy()
    del params["batch_id"]
    del params["column"]

    expectation_results.append(
        {
            "expectation": expectation_result["expectation_type"],
            "column": expectation_result["kwargs"]["column"],
            "params": params,
            "success": expectation_result["success"],
            "element_count": expectation_result["result"]["element_count"],
            "unexpected_count": expectation_result["result"]["unexpected_count"],
            "unexpected_percent": expectation_result["result"]["unexpected_percent"],
            "missing_count": expectation_result["result"]["missing_count"],
            "missing_percent": expectation_result["result"]["missing_percent"],
        }
    )

df_expectation_results = pd.DataFrame(expectation_results)
df_expectation_results

### Explore Validation Results in Data Docs

Create the ValidationDefinition, which pairs a Batch of data with an Expectation Suite.

In [None]:
validation_definition = context.validation_definitions.add(
    gx.ValidationDefinition(
        name="demo validation definition",
        data=pandas_batch_definition,
        suite=suite,
    )
)

Run the ValidationDefinition using a Checkpoint, and write the results to Data Docs.

In [None]:
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="checkpoint",
        validation_definitions=[validation_definition],
        actions=[gx.checkpoint.actions.UpdateDataDocsAction(name="update_data_docs")],
    )
)

results = checkpoint.run(batch_parameters={"dataframe": df_heart_disease})

### View Expectation Suite and Validation Results in [Data Docs](http://localhost:3000)

Data Docs translate Expectation Suites, Validation Results, and other metadata into human-readable documentation that is saved as a static site. Data Docs are part of the GX Core open source offerings.

Data Docs is running as a [containerized site](http://localhost:3000) for this demo.

## Clean data and preserve data quality standards in an Expectation Suite

In [None]:
df_cleaned = df_heart_disease.copy()

### Create Expectation Suite for schema and validity requirements

Create a new Expectation Suite that codifies schema and validity standards for data used for modeling.

In [None]:
schema_and_validity_suite = gx.ExpectationSuite(
    name="Heart disease data: schema and validity"
)

### Clean and define data quality definitions at the same time

Define common cleaning constants.

In [None]:
INTEGER_TYPES = ["int", "BIGINT"]
FLOAT_TYPES = ["float", "DOUBLE_PRECISION"]

Clean the `age` column and define quality definition:
* Clean the `age` column to enforce an `int` type and add a `ExpectColumnValuesToBeInTypeList` Expectation that validates for an `int` or `BIGINT` type.
* Add an `ExpectColumnValuesToBeBetween` Expectation to validate that patient age is between `0` and `99` years.

In [None]:
# Age
df_cleaned["age"] = df_cleaned["age"].astype(int)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="age", type_list=INTEGER_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeBetween(column="age", min_value=0, max_value=99)
)

Clean the `trestbps` (resting heart rate) and define quality definition:
* Clean the `trestbps` (resting heart rate) column to enforce a `float` type and convert non-decimal values to null. Add a `ExpectColumnValuesToBeInTypeList` Expectation that validates for a `float` or `DOUBLE_PRECISION` type.
* Add an `ExpectColumnValuesToNotBeNull` Expectation to validate that `trestbps` is non-null at least 90% of the time.

In [None]:
# Resting blood pressure (trestbps)
df_cleaned["trestbps"] = df_cleaned["trestbps"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="trestbps", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="trestbps", mostly=0.9)
)

Continue to clean remaining dataset fields while defining Expectations.

In [None]:
# Sex
df_cleaned["sex"] = df_cleaned["sex"].astype(int)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="sex", type_list=INTEGER_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="sex", value_set=[0, 1])
)

# Chest pain
df_cleaned["cp"] = df_cleaned["cp"].astype(int)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="cp", type_list=INTEGER_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="cp", value_set=[1, 2, 3, 4])
)

# Cholesterol (chol)
df_cleaned["chol"] = df_cleaned["chol"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="chol", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="chol", mostly=demo.data.MOSTLY)
)

# Fasting blood sugar (fbs)
df_cleaned["fbs"] = df_cleaned["fbs"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="fbs", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="fbs", value_set=[0, 1])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="fbs", mostly=demo.data.MOSTLY)
)

# Resting electrocardiographic results (restecg)
df_cleaned["restecg"] = df_cleaned["restecg"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="restecg", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="restecg", value_set=[0, 1, 2])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="restecg", mostly=demo.data.MOSTLY)
)

# Maximum heart rate achieved (restecg)
df_cleaned["thalach"] = df_cleaned["thalach"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="thalach", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeBetween(column="thalach", min_value=0)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="thalach", mostly=demo.data.MOSTLY)
)

# Exercise induced angina (exang)
df_cleaned["exang"] = df_cleaned["exang"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="exang", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="exang", value_set=[0, 1])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="exang", mostly=demo.data.MOSTLY)
)

# ST depression induced by exercise relative to rest (oldpeak)
df_cleaned["oldpeak"] = df_cleaned["oldpeak"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="oldpeak", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="oldpeak", mostly=demo.data.MOSTLY)
)

# Slope of the peak exercise ST segment (slope)
df_cleaned["slope"] = df_cleaned["slope"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="slope", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="slope", value_set=[1, 2, 3])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="slope", mostly=0.6)
)

# Major vessels (0-3) colored by flourosopy (ca)
df_cleaned["ca"] = df_cleaned["ca"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="ca", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="ca", value_set=[0, 1, 2, 3])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="ca", mostly=0.3)
)

# Heart defect (thal)
df_cleaned["thal"] = df_cleaned["thal"].apply(
    lambda x: demo.data.clean_question_mark_for_float_columns(x)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInTypeList(column="thal", type_list=FLOAT_TYPES)
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="thal", value_set=[3, 6, 7])
)
schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToNotBeNull(column="thal", mostly=0.4)
)

# Diagnosis of heart disease (num)
df_cleaned["num"] = df_cleaned["num"].astype(int)
df_cleaned["num"] = df_cleaned["num"].apply(lambda x: 0 if x == 0 else 1)

schema_and_validity_suite.add_expectation(
    gxe.ExpectColumnValuesToBeInSet(column="num", value_set=[0, 1])
)

Define the expected schema for the dataset based on columns present. 

In [None]:
# Dataset schema expectations.
schema_and_validity_suite.add_expectation(
    gxe.ExpectTableColumnsToMatchOrderedList(column_list=demo.data.COLUMNS)
)

df_cleaned = df_cleaned[demo.data.CLEANED_COLUMNS]

### Create Expectation Suite for distribution requirements

As part of EDA, examine the distribution of the patient `age` column.

In [None]:
alt.Chart(df_cleaned).mark_bar().encode(
    alt.X("age", bin=alt.Bin(extent=[25, 80], step=5)),
    alt.Y("count()"),
    tooltip=["age", "count()"],
)

Create a new Expectation Suite to define distribution requirements.

In [None]:
distribution_suite = gx.core.expectation_suite.ExpectationSuite(
    name="Heart disease data: distribution"
)

Add distribution Expectations for the `age` column based on EDA.

In [None]:
distribution_suite.add_expectation(
    gxe.ExpectColumnQuantileValuesToBeBetween(
        column="age",
        quantile_ranges={
            "quantiles": [0, 0.25, 0.5, 0.75, 1],
            "value_ranges": [[25, 32], [45, 50], [50, 55], [58, 62], [75, 80]],
        },
    )
)

# Disable auto-formatting the bins and weights lists.
# fmt: off
distribution_suite.add_expectation(
    gxe.ExpectColumnKLDivergenceToBeLessThan(
        column="age",
        partition_object={
            "bins": [25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80],
            "weights": [0.004, 0.018, 0.064, 0.107, 0.124, 0.196, 0.212, 0.163, 0.078, 0.026, 0.008],
        },
        threshold=0.1,
    )
)
# fmt: on

### Persist prepared data to Postgres

Write the prepared data to Postgres (running as a containerized service for this demo).

In [None]:
rows_written = demo.data.write_df_to_postgres(table_name="heart_disease", df=df_cleaned)
print(f"{rows_written} rows written to Postgres.")

### Run Expectation Suites against prepared data in Postgres

Use the same steps as previously shown for the GX data validation workflow.

First, create the Data Source, Data Asset, and Batch Definition.

In [None]:
postgres_data_source = context.data_sources.add_postgres(
    "postgres", connection_string=demo.data.POSTGRES_CONNECTION_STRING
)

postgres_data_asset = postgres_data_source.add_table_asset(
    name="Prepared heart disease data", table_name="heart_disease"
)

postgres_batch_definition = postgres_data_asset.add_batch_definition_whole_table(
    "prepared data batch definition"
)

Add the Expectation Suites and Validation Definitions.

In [None]:
context.suites.add(schema_and_validity_suite)
context.suites.add(distribution_suite)

schema_and_validity_validation_definition = gx.ValidationDefinition(
    name="validation definition for schema and validity",
    data=postgres_batch_definition,
    suite=schema_and_validity_suite,
)

distribution_validation_definition = gx.ValidationDefinition(
    name="validation definition for distribution",
    data=postgres_batch_definition,
    suite=distribution_suite,
)

# (Capture and discard final line output to keep notebook cleaner for demo).
context.validation_definitions.add(schema_and_validity_validation_definition)
_ = context.validation_definitions.add(distribution_validation_definition)

Define and run the Checkpoint, write results to Data Docs.

In [None]:
checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="cleaned data checkpoint",
        validation_definitions=[
            schema_and_validity_validation_definition,
            distribution_validation_definition,
        ],
        actions=[gx.checkpoint.actions.UpdateDataDocsAction(name="update_data_docs")],
    )
)

results = checkpoint.run()

### View Expectation Suite and Validation Results in [Data Docs](http://localhost:3000)

## Store Expectation Suite and Validation Results in GX Cloud 

### Clean GX Cloud sandbox for demo

Clean previously created demo entities from the GX Cloud org before re-running the data validation workflow that saves demo entities to GX Cloud.

In [None]:
demo.data.clean_demo_data_from_gx_cloud_org(
    context=gx.get_context(mode="cloud"),
    data_source_name="demo database",
    expectation_suite_names=[
        "Heart disease data: schema and validity",
        "Heart disease data: distribution",
    ],
    validation_definition_names=[
        "schema and validity validation definition",
        "distribution validation definition",
        "new data schema and validity validation definition",
        "new data distribution validation definition",
    ],
    checkpoint_names=[
        "Prepared heart disease data checkpoint",
        "New heart disease data checkpoint",
    ],
)

### Save Expectation Suite to GX Cloud and run data validation

Use the same data validation workflow as last shown, but create a Cloud Data Context. The `gx.get_context(mode="cloud")` call automatically discovers  GX Cloud credentials stored in the `GX_CLOUD_ORGANIZATION_ID` and `GX_CLOUD_ACCESS_TOKEN` environment variables.

In [None]:
# Get a Data Context for GX Cloud organization.
cloud_context = gx.get_context(mode="cloud")

Run the same data validation workflow, using the Cloud Data Context, to store GX workflow entities in GX Cloud.

In [None]:
# Create Data Source, Data Asset, Batch Definition.
cloud_data_source = cloud_context.data_sources.add_postgres(
    "demo database", connection_string=demo.data.POSTGRES_CONNECTION_STRING
)

cloud_data_asset = cloud_data_source.add_table_asset(
    name="Prepared heart disease data", table_name="heart_disease"
)

cloud_batch_definition = cloud_data_asset.add_batch_definition_whole_table(
    "prepared data batch definition"
)

# Add previously defined Expectation Suites.
cloud_context.suites.add(schema_and_validity_suite)
cloud_context.suites.add(distribution_suite)

# Create Validation Definitions.
cloud_schema_and_validity_validation_definition = gx.ValidationDefinition(
    name="schema and validity validation definition",
    data=cloud_batch_definition,
    suite=schema_and_validity_suite,
)

cloud_distribution_validation_definition = gx.ValidationDefinition(
    name="distribution validation definition",
    data=cloud_batch_definition,
    suite=distribution_suite,
)

cloud_context.validation_definitions.add(
    cloud_schema_and_validity_validation_definition
)
cloud_context.validation_definitions.add(cloud_distribution_validation_definition)

# Create Checkpoint.
cloud_checkpoint = cloud_context.checkpoints.add(
    gx.Checkpoint(
        name="Prepared heart disease data checkpoint",
        validation_definitions=[
            cloud_schema_and_validity_validation_definition,
            cloud_distribution_validation_definition,
        ],
    )
)

### View Data Asset and Expectation Suites in [GX Cloud](https://app.greatexpectations.io)

Run the Checkpoint to persist Validation Results in GX Cloud.

In [None]:
results = cloud_checkpoint.run()

### Validation Results in [GX Cloud](https://app.greatexpectations.io)