# Data Quality Rules Execution

## Overview
This notebook executes IBM Cloud Pak for Data (CPD) data quality rules in parallel for improved performance. It provides:

- **Parallel execution** of multiple DQ rules using ThreadPoolExecutor
- **Configurable batch sizes** to control cluster load
- **Output saved to project** with execution results  

## Environment Settings

**CPD Host**: The IBM Cloud Pak for Data cluster endpoint where your data quality rules are deployed.

**Batch Size**: Maximum number of rules to execute concurrently. Adjust based on your cluster capacity.

> ⚠️ **Note**: Higher batch sizes may overwhelm the cluster and cause timeouts. Start with 5 and adjust based on performance.

In [None]:
CPD_HOST = "cp4d-env.company.com"
BATCH_SIZE = 5

## Required Imports

In [41]:
from ibm_watson_studio_lib import access_project_or_space
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import requests
import pandas as pd

import urllib3
urllib3.disable_warnings()

### Initialize Watson Studio library for project access

In [42]:
wslib = access_project_or_space()

### Listing assets by a specific asset type
You can use `wslib.assets.list_assets` without a filter to retrieve all assets of a given asset type. Use `wslib.assets.list_asset_types` to get a list of all available asset types. Or you can use the generic asset type asset to retrieve all assets. In this case we need data quality rules.

In [43]:
dq_rules = wslib.assets.list_assets("data_rule")
# wslib.show(dq_rules)
# for dq_rule in dq_rules:
#    print(dq_rule['asset_id'] + ' - ' + dq_rule['name'])


## Core Execution Function

Executes a single data quality rule via CPD API and returns structured results.

In [6]:
def execute_rule(token, project_id, rule_id):
    try:
        url = f"https://{CPD_HOST}/data_quality/v3/projects/{project_id}/rules/{rule_id}/execute"
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {token}'
        }
        
        response = requests.post(url, headers=headers, verify=False)
        
        if response.status_code == 200:
            execution_data = response.json()
            return {
                'success': True,
                'rule_id': rule_id,
                'rule_name': execution_data.get('name', 'Unknown Rule'),
                'status': execution_data.get('status', {}).get('state', 'unknown'),
                'job_id': execution_data.get('job', {}).get('id'),
                'job_run_id': execution_data.get('job_run', {}).get('id')
            }
        else:
            return {
                'success': False,
                'rule_id': rule_id,
                'error': f"HTTP {response.status_code}",
                'response': response.text
            }
            
    except Exception as e:
        return {
            'success': False,
            'rule_id': rule_id,
            'error': str(e)
        }

## Parallel Execution Function

Executes multiple data quality rules concurrently using ThreadPoolExecutor.

In [7]:
def execute_rules_parallel(token, project_id, dq_rules, max_workers=5):
    """Execute rules in parallel and return structured results"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all jobs
        future_to_rule = {
            executor.submit(execute_rule, token, project_id, rule['asset_id']): rule
            for rule in dq_rules
        }
        
        # Process as they complete
        for future in as_completed(future_to_rule):
            rule = future_to_rule[future]
            
            try:
                result = future.result()
                results.append((rule, result))
            except Exception as exc:
                results.append((rule, {'success': False, 'error': str(exc)}))
    
    return results

## Results Processing

Converts execution results into a pandas DataFrame for analysis and display.

In [8]:
def results_to_dataframe(results):
    data = []
    for rule, result in results:
        if result.get('success', False):
            data.append({
                'Rule Name': rule['name'],
                'Rule ID': rule['asset_id'],
                'Status': result.get('status', 'unknown'),
                'Job ID': result.get('job_id', 'N/A'),
                'Job Run ID': result.get('job_run_id', 'N/A'),
                'Success': True,
                'Error': None
            })
        else:
            data.append({
                'Rule Name': rule['name'],
                'Rule ID': rule['asset_id'], 
                'Status': 'Failed',
                'Job ID': 'N/A',
                'Job Run ID': 'N/A',
                'Success': False,
                'Error': result.get('error', 'Unknown error')
            })
    
    df = pd.DataFrame(data)
    return df

## Save Results

Saves API calls result as a data asset withtin the project as CSV.

In [9]:
def save_results(pandas_df, prefix="dq_run"):
    """Save DataFrame with timestamp in filename"""

    timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    filename = f"{prefix}_{timestamp}.csv"
    
    # Save the file
    wslib.save_data(filename, pandas_df.to_csv(index=False).encode())
    print(f"Results saved to: {filename}")
    
    return filename

## Execute Rules and Save Results

Main execution block: get current project ID and auth token, run rules in parallel, process results, and save the result.

In [10]:
project_id = wslib.here.get_ID()
token = wslib.auth.get_current_token()

results = execute_rules_parallel(token, project_id, dq_rules, max_workers=BATCH_SIZE)
df = results_to_dataframe(results)
save_results(df)

Results saved to: dq_run_2025-06-18-01-02-38.csv


'dq_run_2025-06-18-01-02-38.csv'

## Display Results (optional)

In [11]:
display(df)

Unnamed: 0,Rule Name,Rule ID,Status,Job ID,Job Run ID,Success,Error
0,dqr data uniqueness - taxgroupdata 2,557ca02b-c7bd-48b7-a3f0-ea530806a12a,scheduled,f357d422-066d-41d9-be3a-7199269a02bf,48604c21-dcb6-4e0f-97fd-7c43f623a787,True,
1,dqr data uniqueness - taxgroupdata 1,d5c3d78c-ba91-45f3-89bd-b70195e1f086,scheduled,0518b02f-e40a-478c-af0d-846c7bc3a418,8426306d-08b8-42b2-b332-fade415b608c,True,
2,dqr data uniqueness - taxgroupdata,0691bdd1-a991-4a53-86df-250958f8bcb7,scheduled,28a31275-e000-42f8-99b1-dc732753b961,2777b92c-4cdb-46a6-812f-f2383b2ac81b,True,
3,dqr data uniqueness - taxdata 1,0f2d8b94-b88b-40ac-8e07-02565b94a4ef,scheduled,160d8059-b668-4c6d-b006-157745d8c92a,44738437-2b61-4ec9-9f7e-9f46ff20b218,True,
4,dqr data uniqueness - taxdata,c941f3d0-b7c2-482d-bb02-245e62364f62,scheduled,b6e3650d-ec2c-4c97-8fe3-f925325957e9,dbba4aa7-063f-4cb8-bd8b-1517f1a7b39a,True,
5,dqr data uniqueness - taxdata 2,d94d3be5-45d3-48d4-8173-69429d2bea5e,scheduled,6fc1f10c-bc81-493b-946b-3b163aa7516b,1199e639-2402-4d4e-a473-5d5a683a62bd,True,
