In [2]:
# Creating mock data

In [24]:
!pip install Faker
!pip install pandas



In [1]:
from faker import Faker
import pandas as pd
import random

fake = Faker()
data = []

for _ in range(100):
    data.append([fake.name(), fake.email(), fake.date_of_birth(minimum_age=18), random.randint(10000,1000000)])

df = pd.DataFrame(data, columns=["Name", "Email", "DateOfBirth", "AccountBalance"])
df.to_csv("mock_data.csv", index=False)

In [10]:
# Getting started with snowflakes

In [None]:
!pip install --upgrade snowflake-connector-python
!pip install "snowflake-connector-python[pandas]"

In [None]:
# If you're on a Linux distribution, you'll also need to install a few packages from your distribution's repository. Specifically, you'll need the equivalent of:

# libm-devel
# openssl-devel

In [None]:
# Snowflake login url: https://kduibgy-xn10235.snowflakecomputing.com/console/login

In [1]:
import snowflake.connector
import os

In [2]:
# Gets the version
ctx = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASS'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    session_parameters={
        'QUERY_TAG': 'TestingSnowflakePythonConnector',
    }
    )
cs = ctx.cursor()
try:
    cs.execute("SELECT current_version()")
    one_row = cs.fetchone()
    print(one_row[0])
finally:
    cs.close()
ctx.close()

7.30.0


In [3]:
conn = snowflake.connector.connect(
    user=os.getenv('SNOWFLAKE_USER'),
    password=os.getenv('SNOWFLAKE_PASS'),
    account=os.getenv('SNOWFLAKE_ACCOUNT'),
    session_parameters={
        'QUERY_TAG': 'TestingSnowflakePythonConnector',
    }
)

In [4]:
conn.cursor().execute("CREATE WAREHOUSE IF NOT EXISTS test_warehouse")
conn.cursor().execute("USE WAREHOUSE test_warehouse")
conn.cursor().execute("CREATE DATABASE IF NOT EXISTS test_db")
conn.cursor().execute("USE DATABASE test_db")
conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS test_schema")
conn.cursor().execute("USE SCHEMA test_schema")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f6bac09bd30>

In [14]:
# conn.cursor().execute(
#     "CREATE OR REPLACE TABLE "
#     "test_table("
#     "  Name string,"
#     "  Email string,"
#     "  DateOfBirth date,"
#     "  AccountBalance integer"
#     ")")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fc0216c8160>

In [16]:
# # Uploading file
# conn.cursor().execute("PUT file:///home/yogi/DE/great_expectations/mock_data.csv @%test_table")
# conn.cursor().execute("COPY INTO test_table file_format = (type = csv field_delimiter = ',' skip_header = 1)")

<snowflake.connector.cursor.SnowflakeCursor at 0x7fc0216c8580>

In [5]:
result = conn.cursor().execute("SELECT * FROM test_table")

In [12]:
result_batch_gen = result.fetch_pandas_batches()

In [None]:
for batch_df in result_batch_gen:
    print(batch_df)

In [15]:
batch_df

Unnamed: 0,NAME,EMAIL,DATEOFBIRTH,ACCOUNTBALANCE
0,Tommy Washington,hjohnson@example.com,1932-09-15,144273
1,Kimberly Park,tjohnson@example.com,1908-07-28,740555
2,Madison Price,rosaleswalter@example.org,1920-08-27,636655
3,Justin Smith,elizabetherickson@example.com,1975-11-16,822670
4,Jason Mills,troy16@example.com,1938-03-29,417362
...,...,...,...,...
95,Donna Coffey,iguerra@example.net,1959-03-21,291871
96,Emily Mitchell,mkim@example.org,1939-10-19,306133
97,Aaron Church,barnettmelissa@example.com,1966-10-13,506054
98,Gilbert Benson,sandovaljohn@example.com,1958-09-28,515419


In [16]:
# Great Expectations

In [84]:
# !pip install great_expectations
# !pip install sqlalchemy
!pip install snowflake-sqlalchemy
!pip install 'great_expectations[sqlalchemy]'
!pip install SQLAlchemy==1.4.49



In [69]:
# !pip uninstall -y great_expectations[sqlalchemy]

Found existing installation: great-expectations 0.17.14
Uninstalling great-expectations-0.17.14:
  Successfully uninstalled great-expectations-0.17.14


In [3]:
import great_expectations as ge
from urllib.parse import quote
import os

In [22]:
# !great_expectations -y init

[36m
  ___              _     ___                  _        _   _
 / __|_ _ ___ __ _| |_  | __|_ ___ __  ___ __| |_ __ _| |_(_)___ _ _  ___
| (_ | '_/ -_) _` |  _| | _|\ \ / '_ \/ -_) _|  _/ _` |  _| / _ \ ' \(_-<
 \___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
                                |_|
             ~ Always know what to expect from your data ~
[0m


[36mCongratulations! You are now ready to customize your Great Expectations configuration.[0m

[36mYou can customize your configuration in many ways. Here are some examples:[0m

  [36mUse the CLI to:[0m
    - Run `[32mgreat_expectations datasource new[0m` to connect to your data.
    - Run `[32mgreat_expectations checkpoint new <checkpoint_name>[0m` to bundle data with Expectation Suite(s) in a Checkpoint for later re-validation.
    - Run `[32mgreat_expectations suite --help[0m` to create, edit, list, profile Expectation Suites.
    - Run `[32mgreat_expectations docs --help[0m` to build 

In [4]:
context = ge.get_context()

In [5]:
user=os.getenv('SNOWFLAKE_USER')
password=quote(os.getenv('SNOWFLAKE_PASS')) # encoding to handle special chars in password
account=os.getenv('SNOWFLAKE_ACCOUNT')
role="ACCOUNTADMIN"
warehouse="test_warehouse"
datbase="test_db"
schema="test_schema"
table="test_table"

In [6]:
# connection string format: "snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT_NAME_OR_LOCATOR>/<DATABASE_NAME>/<SCHEMA_NAME>?warehouse=<WAREHOUSE_NAME>&role=<ROLE_NAME>"
my_connection_string = f"snowflake://{user}:{password}@{account}/{datbase}/{schema}?warehouse={warehouse}&role={role}"

In [7]:
datasource_name = "test_snowflake_datasource"
# # Creating Data source
# datasource = context.sources.add_snowflake(
#     name=datasource_name, 
#     connection_string=my_connection_string
# )

In [8]:
datasource = context.datasources[datasource_name]

In [9]:
asset_name = "test_asset"
asset_table_name = table

In [None]:
# # Creating table asset
# table_asset = datasource.add_table_asset(name=asset_name, table_name=asset_table_name)

In [12]:
#optional query asset 
# asset_name = "test_query_asset"
# query = f"SELECT * from {table} where ACCOUNTBALANCE > 500000"
# query_asset = datasource.add_query_asset(name=asset_name, query=query)

In [10]:
data_asset = context.get_datasource(datasource_name).get_asset(asset_name)
batch_request = data_asset.build_batch_request()

In [11]:
context.add_or_update_expectation_suite("test_expectation_suite")

{
  "expectation_suite_name": "test_expectation_suite",
  "ge_cloud_id": null,
  "expectations": [],
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.17.14"
  }
}

In [12]:
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="test_expectation_suite",
)
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,name,email,dateofbirth,accountbalance
0,Tommy Washington,hjohnson@example.com,1932-09-15,144273
1,Kimberly Park,tjohnson@example.com,1908-07-28,740555
2,Madison Price,rosaleswalter@example.org,1920-08-27,636655
3,Justin Smith,elizabetherickson@example.com,1975-11-16,822670
4,Jason Mills,troy16@example.com,1938-03-29,417362


In [13]:
validator.expect_column_values_to_not_be_null(column="name")
validator.expect_column_values_to_not_be_null(column="accountbalance")

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [14]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [31]:
checkpoint = context.add_or_update_checkpoint(
    name="test_checkpoint",
    run_name_template="test-run_%Y-%m-%d_%H:%M",
    expectation_suite_name="test_expectation_suite",
    validations=[
        {
            "batch_request": batch_request,
        },
    ],
)

In [18]:
#optional checkpoint commands:
# checkpoint_result = checkpoint.run(run_name="test_run")
# context.build_data_docs()
# retrieved_checkpoint = context.get_checkpoint(name="test_checkpoint")

# # test yaml syntax of provided string(yaml_config) is correct
# my_checkpoint = context.test_yaml_config(yaml_config=yaml_config)

Calculating Metrics:   0%|          | 0/16 [00:00<?, ?it/s]

{'local_site': 'file:///home/yogi/DE/great_expectations/great_expectations/uncommitted/data_docs/local_site/index.html'}

In [33]:

checkpoint_result = checkpoint.run()
# context.build_data_docs()

Calculating Metrics:   0%|          | 0/16 [00:00<?, ?it/s]