# Great Expectation - SQL DB

## Install & Import Dependency

In [1]:
%pip install 'great_expectations[sqlalchemy]'
%pip install 'great_expectations[postgresql]'
%pip install SQLAlchemy
%pip install psycopg2-binary
%pip install sqlalchemy

Note: you may need to restart the kernel to use updated packages.
Collecting psycopg2-binary>=2.7.6 (from great_expectations[postgresql])
  Obtaining dependency information for psycopg2-binary>=2.7.6 from https://files.pythonhosted.org/packages/e3/b2/f578b59b83563648c7224bae9397dc4bab6fe2dd2b4338786bb7e373bc4a/psycopg2_binary-2.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Using cached psycopg2_binary-2.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting sqlalchemy>=1.4.0 (from great_expectations[postgresql])
  Obtaining dependency information for sqlalchemy>=1.4.0 from https://files.pythonhosted.org/packages/99/f4/5c7868896285b0d95b6b3f0310850c6cf50b965569417c2959d2bd6a115d/SQLAlchemy-2.0.21-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Using cached SQLAlchemy-2.0.21-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.4 kB)
Collecting greenlet!=0.4.17 (from sqlalchemy>=1.4.0-

In [32]:
%%bash

echo -e "Verify your installed version `great_expectations --version | grep -oP 'version \K[^ ]+'` is higher than 0.17.19"

Verify your installed version 0.17.19 is higher than 0.17.19


In [2]:
import pandas as pd
import great_expectations as gx
from sqlalchemy import create_engine
from great_expectations.data_context import FileDataContext
from great_expectations.checkpoint import Checkpoint

## Initialize local variables

In [3]:
# variables
path_to_repo_dir = "/home/anku/sandbox/DAMG7245-Fall2023" # TODO: change this to your local path

## Initialize GX dir

In [4]:
context = FileDataContext.create(project_root_dir=path_to_repo_dir)

## DB Connection String

In [5]:
# connection_string = "postgresql+psycopg2://<username>:${MY_PASSWORD}@<host>:<port>/<database>"
connection_string = "postgresql+psycopg2://postgres:root@localhost:5432/postgres"

## Insert data into database

NEXRAD Station List
* Source - https://www.ncei.noaa.gov/products/radar/next-generation-weather-radar

In [6]:
engine = create_engine(connection_string)
engine.connect()
cols = [
    (20, 51),    # Name
    (72, 75),    # ST
    (106, 116),  # Lat
    (116, 127)   # Lon
]
df = pd.read_fwf(r"https://www.ncei.noaa.gov/access/homr/file/nexrad-stations.txt", colspecs=cols, skiprows=[1])
df.to_sql(name='noaa_tbl', con=engine, index=False, if_exists='replace')
engine.dispose()
df.head()

Unnamed: 0,NAME,ST,LAT,LON
0,ABERDEEN,SD,45.455833,-98.413333
1,ALBUQUERQUE,NM,35.149722,-106.82388
2,NORFOLK RICH,VA,36.98405,-77.007361
3,AMARILLO,TX,35.233333,-101.70927
4,MIAMI,FL,25.611083,-80.412667


## GX : Create Data Source

Here the source of data is a table called `noaa_tbl` in sql database 

In [7]:
datasource = context.sources.add_sql(name="SQL_DB_Postgres", connection_string=connection_string)

In [8]:
datasource.add_table_asset(name="noaa_station_data", table_name="noaa_tbl")

TableAsset(name='noaa_station_data', type='table', id=None, order_by=[], batch_metadata={}, splitter=None, table_name='noaa_tbl', schema_name=None)

In [9]:
batch_request = datasource.get_asset("noaa_station_data").build_batch_request()

## GX : Create Expectations Suite

Expectations suite has the validation / checks to be done on data.

In [10]:
expectation_suite_name = "NOAA_Station_Data_Expectation_Suite"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

print(validator.head())

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

           NAME  ST        LAT         LON
0      ABERDEEN  SD  45.455833  -98.413333
1   ALBUQUERQUE  NM  35.149722 -106.823880
2  NORFOLK RICH  VA  36.984050  -77.007361
3      AMARILLO  TX  35.233333 -101.709270
4         MIAMI  FL  25.611083  -80.412667


In [11]:
states = [
    "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS",
    "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP",
    "OH", "OK", "OR", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "VI", "WA", "WV", "WI", "WY"
]
len(states)

56

In [12]:
validator.expect_column_values_to_not_be_null(column="NAME")
validator.expect_column_values_to_be_in_set("ST", states)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 209,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 4,
    "missing_percent": 1.9138755980861244,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [13]:
validator.save_expectation_suite()

## GX : Create Checkpoint

Checkpoint to run the validations defined in the expectation against the datasource and create a report

In [14]:
my_checkpoint_name = "NOAA_Station_Data_Checkpoint_v1"

checkpoint = Checkpoint(
    name=my_checkpoint_name,
    run_name_template="%Y%m%d-%H%M%S-Manual_Run",
    data_context=context,
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {"name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}},
    ],
)

In [15]:
context.add_or_update_checkpoint(checkpoint=checkpoint)

{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction"
      }
    }
  ],
  "batch_request": {
    "datasource_name": "SQL_DB_Postgres",
    "data_asset_name": "noaa_station_data",
    "options": {}
  },
  "class_name": "Checkpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "expectation_suite_name": "NOAA_Station_Data_Expectation_Suite",
  "module_name": "great_expectations.checkpoint",
  "name": "NOAA_Station_Data_Checkpoint_v1",
  "profilers": [],
  "run_name_template": "%Y%m%d-%H%M%S-Manual_Run",
  "runtime_configuration": {},
  "validations": []
}

In [16]:
checkpoint_result = checkpoint.run()

Calculating Metrics:   0%|          | 0/19 [00:00<?, ?it/s]

In [None]:
context.open_data_docs()