<a href="https://colab.research.google.com/github/samer-glitch/Federated-Governance-and-Provenance-Scoring-for-Trustworthy-AI-A-Metadata-Ledger-Approach/blob/main/Provenance_Score_Engine_and_Implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# 1. INSTALL & IMPORT
!pip install --quiet pandas openpyxl
import pandas as pd, numpy as np, io, os, hashlib
from datetime import datetime
from google.colab import files
from IPython.display import display, HTML

# 2. UPLOAD CONFIG & CHARTER
print("Upload config.xlsx and charter.xlsx")
uploaded = files.upload()
config_file = [fn for fn in uploaded if "config" in fn][0]
config = pd.read_excel(io.BytesIO(uploaded[config_file]), sheet_name='Config').set_index('Field')['Value']
charter_file = [fn for fn in uploaded if "charter" in fn][0]
charter = pd.read_excel(io.BytesIO(uploaded[charter_file]), sheet_name='Charter')

# 3. UPLOAD DATA
print("Upload raw.csv")
data_file = files.upload()
data = pd.read_csv(io.BytesIO(next(iter(data_file.values()))))
data.replace('?', np.nan, inplace=True)
data.columns = data.columns.str.strip().str.lower().str.replace('-', '_')

# 4. AUTOMATIC QUALITY SCORING (DIM 2)
def score_completeness(df):
    return 1 - df.isna().mean().mean()
def score_duplication(df):
    return 1 - len(df.drop_duplicates())/len(df)
def score_error_rate(df):
    return df.isna().sum().sum() / df.size
def score_consistency(df):
    scores = []
    for col in df:
        non_null = df[col].dropna()
        if non_null.empty: scores.append(1)
        elif pd.api.types.is_numeric_dtype(non_null): scores.append(non_null.apply(lambda x: isinstance(x, (int, float))).mean())
        else: scores.append(non_null.apply(lambda x: isinstance(x, str)).mean())
    return np.mean(scores)
def to_score(val, bins, labels):
    return int(pd.cut([val], bins=bins, labels=labels, include_lowest=True)[0])

comp_score = to_score(score_completeness(data), [0, .90, .95, .98, .99, .995, 1], [0,1,2,3,4,5])
dup_score  = to_score(score_duplication(data), [0,.001,.005,.01,.02,.05,1], [5,4,3,2,1,0])
err_score  = to_score(score_error_rate(data), [0,.001,.005,.01,.02,.05,1], [5,4,3,2,1,0])
cons_score = to_score(score_consistency(data), [0,.7,.88,.93,.95,.98,1], [0,1,2,3,4,5])
dim2_score = np.mean([comp_score, dup_score, err_score, cons_score])

# 5. MANUAL SCORING (ADMINISTRATOR: refer to charter.xlsx)
def prompt_scores(dim_name, fields):
    return [int(input(f"{dim_name}.{f} (0–5): ")) for f in fields]

dim1_fields = ["Source Reputation", "Data Controller", "Data Objective"]
dim3_fields = ["Data Dictionary", "Version Logs", "Collection Protocol", "Updates on Definitions"]
dim5_fields = ["Regulation Coverage", "Explicit Consent", "Geo-Restrictions", "Sensitivity Classification", "Audits & Certifications"]
dim6_fields = ["License Terms", "Ethical Reviews", "Redistribution", "User Agreements"]

print("Manual scoring based on your project policy and charter.xlsx.")
dim1_score = np.mean(prompt_scores("Dim1", dim1_fields))
dim3_score = np.mean(prompt_scores("Dim3", dim3_fields))
dim5_score = np.mean(prompt_scores("Dim5", dim5_fields))
dim6_score = np.mean(prompt_scores("Dim6", dim6_fields))

# 6. FRESHNESS (DIM 4) – Banding Logic
ex_date = input("Extraction date (YYYY-MM-DD): ")
delta_years = (pd.Timestamp.now() - pd.Timestamp(ex_date)).days / 365.0

if delta_years <= 1/12:
    freshness_score = 5
elif delta_years <= 0.5:
    freshness_score = 4
elif delta_years <= 1:
    freshness_score = 3
elif delta_years <= 2:
    freshness_score = 2
elif delta_years <= 5:
    freshness_score = 1
else:
    freshness_score = 0

# 7. PROVENANCE SCORE + DECISION
W = [float(config[f'W{i+1}']) for i in range(6)]
dims = [dim1_score, dim2_score, dim3_score, freshness_score, dim5_score, dim6_score]
pscore = sum([W[i]*dims[i] for i in range(6)])
accept_th = float(config['Accept_Threshold'])
review_th = float(config['Review_Threshold'])
quarantine_th = float(config['Quarantine_Threshold'])
if pscore >= accept_th:
    initial_action = 'ACCEPT'
elif pscore >= review_th:
    initial_action = 'REVIEW'
else:
    initial_action = 'QUARANTINE'
final_action = initial_action  # unless an admin override is added later

# For single-user workflow, client and version are static/monotonic
client_id = input("Enter client ID or pseudonym: ")
ledger_file = f"{client_id}_ledger.csv"
version = 1
if os.path.exists(ledger_file):
    old_df = pd.read_csv(ledger_file)
    if not old_df.empty:
        version = int(old_df['version'].max()) + 1

timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
record_count = len(data)
# Make a hash as tx_id (hash of timestamp+client_id+version+record_count+pscore+action)
hash_input = f"{timestamp}{client_id}{version}{record_count}{pscore}{initial_action}".encode()
tx_id = hashlib.sha256(hash_input).hexdigest()

row = {
    'tx_id': tx_id,
    'version': version,
    'timestamp': timestamp,
    'client': client_id,
    'record_count': record_count,
    'dim1': dim1_score,
    'dim2': dim2_score,
    'dim3': dim3_score,
    'dim4': freshness_score,
    'dim5': dim5_score,
    'dim6': dim6_score,
    'pscore': round(pscore,3),
    'initial_action': initial_action,
    'final_action': final_action
}

# Append to ledger (create if doesn't exist)
ledger_cols = ['tx_id','version','timestamp','client','record_count','dim1','dim2','dim3','dim4','dim5','dim6','pscore','initial_action','final_action']
if os.path.exists(ledger_file):
    ledger_df = pd.read_csv(ledger_file)
    ledger_df = ledger_df.append(row, ignore_index=True)
else:
    ledger_df = pd.DataFrame([row])
ledger_df.to_csv(ledger_file, index=False)

# Show the result table
display(HTML("<h3>Hybrid Provenance Scores & Actions</h3>"))
result_cols = ['dim1','dim2','dim3','dim4','dim5','dim6','pscore','initial_action','record_count']
display(pd.DataFrame([row], columns=result_cols))

# Print ledger (show all entries for this client)
display(HTML(f"<h3>Local Ledger: {ledger_file}</h3>"))
display(ledger_df)


Upload config.xlsx and charter.xlsx


Saving charter.xlsx to charter (1).xlsx
Saving config.xlsx to config (1).xlsx
Upload raw.csv


Saving diabetic_data.csv to diabetic_data (1).csv
Manual scoring based on your project policy and charter.xlsx.
Dim1.Source Reputation (0–5): 5
Dim1.Data Controller (0–5): 5
Dim1.Data Objective (0–5): 5
Dim3.Data Dictionary (0–5): 5
Dim3.Version Logs (0–5): 2
Dim3.Collection Protocol (0–5): 2
Dim3.Updates on Definitions (0–5): 3
Dim5.Regulation Coverage (0–5): 4
Dim5.Explicit Consent (0–5): 5
Dim5.Geo-Restrictions (0–5): 5
Dim5.Sensitivity Classification (0–5): 4
Dim5.Audits & Certifications (0–5): 4
Dim6.License Terms (0–5): 5
Dim6.Ethical Reviews (0–5): 5
Dim6.Redistribution (0–5): 5
Dim6.User Agreements (0–5): 5
Extraction date (YYYY-MM-DD): 2008-12-01
Enter client ID or pseudonym: 1


Unnamed: 0,dim1,dim2,dim3,dim4,dim5,dim6,pscore,initial_action,record_count
0,5.0,2.75,3.0,0,4.4,5.0,3.782,ACCEPT,101766


Unnamed: 0,tx_id,version,timestamp,client,record_count,dim1,dim2,dim3,dim4,dim5,dim6,pscore,initial_action,final_action
0,afbf590c3cf49e246dd7941ef9a48bc62e16621d667400...,1,2025-07-13 09:11:48,1,101766,5.0,2.75,3.0,0,4.4,5.0,3.782,ACCEPT,ACCEPT
