In [15]:
import pandas as pd
import sqlite3
import great_expectations as ge
from great_expectations.dataset import PandasDataset
from great_expectations.core.batch import BatchRequest

import subprocess

def initialize_great_expectations():
    try:
        subprocess.run(["great_expectations", "init"], check=True, text=True, input="y")
        print("Great Expectations initialized successfully.")
    except subprocess.CalledProcessError as e:
        print("Error initializing Great Expectations:", e)

def create_validation_suite(context, suite_name):
    context.add_or_update_expectation_suite(expectation_suite_name=suite_name) 

# Extract data from a source (e.g., CSV file)
def extract_data():
    df = pd.read_csv('data.csv')
    return df

# Transform data
def transform_data(data):
    # Perform data transformation operations
    transformed_data = data.dropna()
    return transformed_data

# Load data to a destination (e.g., a database)
def load_data(data):
    # Establish a connection to the SQLite database
    conn = sqlite3.connect('database.db')

    # Create a database table (if it doesn't exist)
    create_table_query = '''
    CREATE TABLE IF NOT EXISTS water_stock (
    region TEXT,
    variable TEXT,
    RID INTEGER,
    yq REAL,
    value INTEGER,
    year INTEGER,
    Series TEXT,
    Unit TEXT,
    Source TEXT,
    Quarter INTEGER
)
    '''
    conn.execute(create_table_query)

    # Insert the transformed data into the database
    with conn:
        data.to_sql('water_stock', conn, if_exists='replace', index=False)

    # Close the database connection
    conn.close()

# Main ETL function
def etl_pipeline():
    # Step 1: Extract data
    extracted_data = extract_data()

    # Step 2: Transform data
    transformed_data = transform_data(extracted_data)

    # Initialize Great Expectations
    context = ge.data_context.DataContext()

    # Create a PandasDataset from the transformed data
    dataset = PandasDataset(transformed_data)

    # Define expectations
    suite_name = "my_expectations"
    create_validation_suite(context, suite_name)
    
    # Validate the transformed data
    batch_request = {"datasource_name": "my_datasource", "data_connector_name": "my_data_connector", 'data_asset_name': 'water_stock'}
    validator = context.get_validator(
        batch_request=BatchRequest(**batch_request),
        expectation_suite_name=suite_name
    )
    validator.validate(run_name='MyRunName')

    # Check the validation status
    assert validator.success, "Validation failed. Check the data quality."

if __name__ == '__main__':
    initialize_great_expectations()
    etl_pipeline()


Great Expectations initialized successfully.


DatasourceError: Cannot initialize datasource my_datasource, error: The given datasource could not be retrieved from the DataContext; please confirm that your configuration is accurate.