In [21]:
import pandas as pd
import great_expectations as ge

In [22]:
# 1. Load the data
df = pd.read_csv('ecommerce_transactions.csv')

In [29]:
df.columns


Index(['Transaction_ID', 'User_Name', 'Age', 'Country', 'Product_Category',
       'Purchase_Amount', 'Payment_Method', 'Transaction_Date'],
      dtype='object')

In [24]:
# 2. Convert to a Great Expectations Dataset
# This is the most stable method to bypass 'EphemeralDataContext' errors
context = ge.get_context()

In [25]:
# 3. Create the Validator using the modern workflow
# This replaces the deprecated 'from_pandas' method
data_source = context.data_sources.add_pandas("pandas_datasource")
data_asset = data_source.add_dataframe_asset(name="ecommerce_data")

In [26]:
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch_def")
batch = batch_definition.get_batch(batch_parameters={"dataframe": df})

# Get validator from the batch
validator = context.get_validator(batch=batch)

In [30]:
# 4. Define your "Business Rules" (The Impact)
# Rule 1: Price must be positive (Finance logic)
validator.expect_column_values_to_be_between("Purchase_Amount", min_value=0)

# Rule 2: Quantity cannot be empty (Operational integrity)
validator.expect_column_values_to_not_be_null("Transaction_ID")

# Rule 3: Customer ID must exist (Marketing/CRM tracking)
validator.expect_column_values_to_not_be_null("User_Name")

Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 400.08it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 404.47it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 441.00it/s]


{
  "success": true,
  "expectation_config": {
    "type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "batch_id": "pandas_datasource-ecommerce_data",
      "column": "User_Name"
    },
    "meta": {},
    "severity": "critical"
  },
  "result": {
    "element_count": 50000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [31]:
# 5. Run the validation
results = validator.validate()

Calculating Metrics: 100%|██████████| 14/14 [00:00<00:00, 666.13it/s]


In [32]:
# 6. Output results
if results['success']:
    print("Project 1: Data passed the Quality Gate!")
else:
    failed_count = sum(1 for result in results.results for check in result.result.values() if isinstance(check, dict) and not check.get('success', True))
    print(f"Project 1: Issues detected; validation failed.")

Project 1: Data passed the Quality Gate!


In [34]:
# 7. The Quarantine Logic
# We extract the indices of the rows that failed any of our 'Business Rules'
all_results = results['results']
bad_indices = set()
failure_reasons = {}

for res in all_results:
    if not res['success']:
        expectation_type = res['expectation_config']['expectation_type']
        column = res['expectation_config']['kwargs'].get('column', 'N/A')
        failed_index_list = res['result'].get('unexpected_index_list', [])
        
        for idx in failed_index_list:
            if idx not in failure_reasons:
                failure_reasons[idx] = []
            failure_reasons[idx].append(f"{expectation_type} on {column}")
        
        bad_indices.update(failed_index_list)

# Add failure reasons to the failed dataframe
if bad_indices:
    df_failed = df.iloc[list(bad_indices)].copy()
    df_failed['failure_reason'] = df_failed.index.map(
        lambda x: '; '.join(failure_reasons.get(x, ['Unknown']))
    )
else:
    df_failed = df.iloc[list(bad_indices)]

df_clean = df.drop(index=list(bad_indices))

# Save with reason column in failed data
df_clean.to_csv('clean_transactions.csv', index=False)
df_failed.to_csv('failed_transactions.csv', index=False)

print(f"Quarantine Complete:")
print(f"Clean Rows: {len(df_clean)}")
print(f"Flagged Rows: {len(df_failed)}")

Quarantine Complete:
Clean Rows: 50000
Flagged Rows: 0
