In [1]:
import itertools
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Set

import pandas as pd
from google.colab import files


# ----------------------------
# Configuration
# ----------------------------

# DECISION_TABLE_CSV = "decision_table_crypto_risk.csv"  # upload or mount in Colab
RULES_TXT_OUT = "crypto_risk_rules.txt"

COND_ATTRS = ["Volatility_30d_Level", "Momentum_7d_Direction", "Liquidity_7d_Level"]
DEC_ATTR = "Decision_Risk_Category"

# ----------------------------
# Upload CSV file
# ----------------------------

print("Please upload your CSV decision table file.")
uploaded = files.upload()

if uploaded:
    uploaded_filename = list(uploaded.keys())[0]
    DECISION_TABLE_CSV = uploaded_filename
    print(f"Using uploaded file: {DECISION_TABLE_CSV}")
else:
    print("No file uploaded. Please upload the 'decision_table_crypto_risk.csv' file manually or ensure it's in the correct path.")
    # Fallback to original name if nothing is uploaded, to potentially raise error later
    DECISION_TABLE_CSV = "decision_table_crypto_risk.csv"


# ----------------------------
# Rough set helpers
# ----------------------------

def equivalence_classes(df: pd.DataFrame, attrs: List[str]) -> List[dict]:
    """
    Returns equivalence classes under IND(attrs).
    Each class: key(tuple), objects(Object_IDs), decisions(unique decision values).
    """
    classes = []
    groups = df.groupby(attrs, dropna=False, sort=False)
    for key, g in groups:
        if not isinstance(key, tuple):
            key = (key,)
        classes.append(
            {
                "key": key,
                "objects": g["Object_ID"].tolist(),
                "decisions": sorted(g[DEC_ATTR].unique().tolist()),
            }
        )
    return classes


def indiscernible_pairs(classes: List[dict]) -> List[Tuple[str, str]]:
    pairs: List[Tuple[str, str]] = []
    for c in classes:
        objs = c["objects"]
        if len(objs) < 2:
            continue
        for i in range(len(objs)):
            for j in range(i + 1, len(objs)):
                pairs.append((objs[i], objs[j]))
    return pairs


def rough_approximations(df: pd.DataFrame, attrs: List[str], target_decision: str) -> Tuple[Set[str], Set[str], Set[str]]:
    """
    For decision class X:
      - lower approximation: union of eq classes fully inside X
      - upper approximation: union of eq classes intersecting X
      - boundary: upper - lower
    """
    classes = equivalence_classes(df, attrs)

    lower: Set[str] = set()
    upper: Set[str] = set()

    for c in classes:
        objs = set(c["objects"])
        decisions = set(df.loc[df["Object_ID"].isin(objs), DEC_ATTR].unique())
        if decisions == {target_decision}:
            lower |= objs
            upper |= objs
        elif target_decision in decisions:
            upper |= objs

    boundary = upper - lower
    return lower, upper, boundary


def positive_region(df: pd.DataFrame, attrs: List[str]) -> Set[str]:
    """
    POS_attrs(D): union of IND(attrs) classes that are decision-consistent
    (all objects in class share one decision).
    """
    pos: Set[str] = set()
    for c in equivalence_classes(df, attrs):
        objs = c["objects"]
        if df.loc[df["Object_ID"].isin(objs), DEC_ATTR].nunique() == 1:
            pos |= set(objs)
    return pos


def dependency_degree(df: pd.DataFrame, attrs: List[str]) -> float:
    """
    γ(attrs, D) = |POS_attrs(D)| / |U|
    """
    return len(positive_region(df, attrs)) / len(df)


def all_attribute_subsets(attrs: List[str]) -> List[Tuple[str, ...]]:
    out: List[Tuple[str, ...]] = []
    for r in range(1, len(attrs) + 1):
        out.extend(itertools.combinations(attrs, r))
    return out


def find_reducts(df: pd.DataFrame, cond_attrs: List[str]) -> Tuple[List[Tuple[str, ...]], Tuple[str, ...], Set[str]]:
    """
    Brute-force reduct search (works well for small attribute sets).
    - B is a reduct if γ(B,D) == γ(C,D) and minimal.
    Returns (reducts, chosen_reduct, core).
    """
    gamma_full = dependency_degree(df, cond_attrs)

    candidates = []
    for subset in all_attribute_subsets(cond_attrs):
        g = dependency_degree(df, list(subset))
        if abs(g - gamma_full) < 1e-12:
            candidates.append(subset)

    reducts: List[Tuple[str, ...]] = []
    for s in sorted(candidates, key=len):
        # skip if a smaller reduct already contained in s
        if any(set(r).issubset(set(s)) for r in reducts):
            continue

        # ensure minimal
        is_minimal = True
        for r in range(1, len(s)):
            for t in itertools.combinations(s, r):
                if t in candidates:
                    is_minimal = False
                    break
            if not is_minimal:
                break

        if is_minimal:
            reducts.append(s)

    chosen = sorted(reducts, key=lambda x: (len(x), list(x)))[0] if reducts else tuple(cond_attrs)

    core = set(reducts[0]) if reducts else set(cond_attrs)
    for r in reducts[1:]:
        core &= set(r)

    return reducts, chosen, core


# ----------------------------
# Rule base + inference engine
# ----------------------------

@dataclass(frozen=True)
class Rule:
    conditions: Dict[str, str]     # attribute -> value
    decision_value: str            # DEC_ATTR value

    def matches(self, facts: Dict[str, str]) -> bool:
        return all(facts.get(a) == v for a, v in self.conditions.items())


def rules_from_table(df: pd.DataFrame, reduct: Tuple[str, ...]) -> List[Rule]:
    """
    Build compact rules from the table using a chosen reduct.
    """
    rules: List[Rule] = []
    grouped = df.groupby(list(reduct), dropna=False, sort=True)
    for key, g in grouped:
        if not isinstance(key, tuple):
            key = (key,)

        decisions = g[DEC_ATTR].unique()
        if len(decisions) != 1:
            continue  # skip inconsistent groups (not expected here)

        conds = {attr: str(val) for attr, val in zip(reduct, key)}
        rules.append(Rule(conditions=conds, decision_value=str(decisions[0])))

    return rules


def save_rules_txt(rules: List[Rule], filepath: str) -> None:
    lines = []
    for r in rules:
        conds = " AND ".join([f"{a}={v}" for a, v in r.conditions.items()])
        lines.append(f"IF {conds} THEN {DEC_ATTR}={r.decision_value}")
    with open(filepath, "w", encoding="utf-8") as f:
        f.write("\n".join(lines))


def forward_chain(facts: Dict[str, str], rules: List[Rule]) -> Optional[str]:
    """
    Data-driven: first matching rule returns a decision.
    """
    for r in rules:
        if r.matches(facts):
            return r.decision_value
    return None


def backward_chain(goal: str, facts: Dict[str, str], rules: List[Rule]) -> bool:
    """
    Goal-driven: can we prove DEC_ATTR=goal from facts?
    """
    for r in rules:
        if r.decision_value != goal:
            continue
        if r.matches(facts):
            return True
    return False


# ----------------------------
# Reporting (Stage 2 outputs)
# ----------------------------

def print_stage2_analysis(df: pd.DataFrame) -> Tuple[List[Rule], Tuple[str, ...]]:
    print("=== 2.1 Decision classes ===")
    decisions = sorted(df[DEC_ATTR].unique().tolist())
    for d in decisions:
        sub = df[df[DEC_ATTR] == d]
        print(f"\nClass: {d} (count={len(sub)})")
        for a in COND_ATTRS:
            dist = sub[a].value_counts(normalize=True)
            dist_str = ", ".join([f"{k}:{v:.3f}" for k, v in dist.items()])
            print(f"  - {a}: {dist_str}")

    print("\n=== 2.2 Indiscernibility relations (IND(B)) ===")
    full_classes = equivalence_classes(df, COND_ATTRS)
    pairs = indiscernible_pairs(full_classes)
    print(f"Using B={COND_ATTRS} -> #equivalence classes={len(full_classes)}, #pairs={len(pairs)}")
    print("Equivalence classes with >=2 objects:")
    for c in full_classes:
        if len(c["objects"]) < 2:
            continue
        key_map = {a: v for a, v in zip(COND_ATTRS, c["key"])}
        print(f"  * {key_map} -> size={len(c['objects'])}, decisions={c['decisions']}")

    print("\nCompare IND(B) across subsets:")
    for subset in all_attribute_subsets(COND_ATTRS):
        classes = equivalence_classes(df, list(subset))
        print(f"  B={list(subset)} -> #classes={len(classes)}")

    print("\n=== 2.3 Rough approximations ===")
    for d in decisions:
        low, upp, bnd = rough_approximations(df, COND_ATTRS, d)
        print(f"Decision={d}: |LOW|={len(low)}, |UPP|={len(upp)}, |BND|={len(bnd)}")

    print("\n=== 2.4 Quality metrics ===")
    U_size = len(df)
    for d in decisions:
        low, upp, _ = rough_approximations(df, COND_ATTRS, d)
        acc = (len(low) / len(upp)) if len(upp) else 0.0
        print(f"Decision={d}: accuracy={acc:.3f}, |LOW|/|U|={len(low)/U_size:.3f}, |UPP|/|U|={len(upp)/U_size:.3f}")

    print("\n=== 2.5 Attribute reduction ===")
    gamma_full = dependency_degree(df, COND_ATTRS)
    print(f"γ(C,D) for full C={COND_ATTRS}: {gamma_full:.3f}")
    reducts, chosen_reduct, core = find_reducts(df, COND_ATTRS)
    print(f"Reducts: {[list(r) for r in reducts]}")
    print(f"CORE: {sorted(core)}")

    removed = [a for a in COND_ATTRS if a not in chosen_reduct]
    if removed:
        print(f"Redundant attribute(s) in this table: {removed}")

    print("\n=== 2.6 Decision rules (from chosen reduct) ===")
    rules = rules_from_table(df, chosen_reduct)
    support = df.groupby(list(chosen_reduct)).size().to_dict()
    for r in rules:
        key = tuple(r.conditions[a] for a in chosen_reduct)
        print(f"IF {r.conditions} THEN {DEC_ATTR}={r.decision_value}  (support={support.get(key, 0)})")

    return rules, chosen_reduct


# ----------------------------
# Console UI (Stage 3.3)
# ----------------------------

def prompt_choice(prompt: str, options: List[str]) -> str:
    opt_set = {o.lower(): o for o in options}
    while True:
        raw = input(f"{prompt} {options}: ").strip().lower()
        if raw in opt_set:
            return opt_set[raw]
        print("Invalid choice. Try again.")


def run_console_app(df: pd.DataFrame, rules: List[Rule], reduct: Tuple[str, ...]) -> None:
    print("\n=== Crypto Risk Expert System (Console) ===")
    print("Enter attribute values. Type Ctrl+C to exit.\n")

    allowed: Dict[str, List[str]] = {}
    for a in reduct:
        allowed[a] = sorted({str(v) for v in df[a].unique().tolist()})

    while True:
        facts: Dict[str, str] = {}
        for a in reduct:
            facts[a] = prompt_choice(f"Enter {a}", allowed[a])

        mode = prompt_choice("Choose inference mode", ["forward", "backward"])
        if mode == "forward":
            decision = forward_chain(facts, rules)
            print(f"\nDecision: {decision if decision else 'Unknown (no rule matched)'}\n")
        else:
            goal = prompt_choice(f"Choose goal value for {DEC_ATTR}", sorted({r.decision_value for r in rules}))
            ok = backward_chain(goal, facts, rules)
            print(f"\nGoal {DEC_ATTR}={goal}: {'PROVED' if ok else 'NOT PROVED'}\n")


# ----------------------------
# Main
# ----------------------------

if __name__ == "__main__":
    df = pd.read_csv(DECISION_TABLE_CSV)

    rules, chosen_reduct = print_stage2_analysis(df)

    save_rules_txt(rules, RULES_TXT_OUT)
    print(f"\nSaved rule base to: {RULES_TXT_OUT}")

    run_console_app(df, rules, chosen_reduct)


Please upload your CSV decision table file.


Saving decision_table_crypto_risk.csv to decision_table_crypto_risk.csv
Using uploaded file: decision_table_crypto_risk.csv
=== 2.1 Decision classes ===

Class: High (count=15)
  - Volatility_30d_Level: High:0.867, Medium:0.133
  - Momentum_7d_Direction: Up:0.467, Neutral:0.467, Down:0.067
  - Liquidity_7d_Level: Low:0.733, Medium:0.267

Class: Medium (count=15)
  - Volatility_30d_Level: Medium:1.000
  - Momentum_7d_Direction: Up:0.667, Neutral:0.333
  - Liquidity_7d_Level: Medium:0.600, High:0.400

=== 2.2 Indiscernibility relations (IND(B)) ===
Using B=['Volatility_30d_Level', 'Momentum_7d_Direction', 'Liquidity_7d_Level'] -> #equivalence classes=11, #pairs=42
Equivalence classes with >=2 objects:
  * {'Volatility_30d_Level': 'Medium', 'Momentum_7d_Direction': 'Neutral', 'Liquidity_7d_Level': 'High'} -> size=2, decisions=['Medium']
  * {'Volatility_30d_Level': 'Medium', 'Momentum_7d_Direction': 'Up', 'Liquidity_7d_Level': 'High'} -> size=4, decisions=['Medium']
  * {'Volatility_30d_L

KeyboardInterrupt: Interrupted by user