In [None]:
import xml.etree.ElementTree as ET
import json
import uuid
from typing import Dict, Any, List, Set, Tuple

# ======================
# Open-PSA XML -> Model
# ======================

def _random_model_name() -> str:
    return f"model-{uuid.uuid4().hex[:8]}"

# ------- XML helpers -------

def _strip(tag: str) -> str:
    return tag.split('}', 1)[-1] if tag else ''

def _first_child(el: ET.Element, name: str):
    if el is None:
        return None
    for c in list(el):
        if _strip(c.tag) == name:
            return c
    return None

# ------- Fault tree parsing (DFS) -------

def _build_gate_map(ft_elem: ET.Element) -> Dict[str, ET.Element]:
    gates: Dict[str, ET.Element] = {}
    for g in ft_elem.findall('.//define-gate'):
        nm = g.get('name')
        if nm:
            gates[nm] = g
    return gates


def _collect_referenced_gates(gate_node: ET.Element) -> Set[str]:
    refs: Set[str] = set()
    def walk(n: ET.Element):
        t = _strip(n.tag)
        if t == 'gate':
            rn = n.get('name')
            if rn:
                refs.add(rn)
        for ch in list(n):
            walk(ch)
    # Walk the formula child(ren)
    for ch in list(gate_node):
        walk(ch)
    return refs


def _find_top_candidates(ft_elem: ET.Element, gates: Dict[str, ET.Element]) -> List[str]:
    defined = set(gates.keys())
    referenced: Set[str] = set()
    for nm, g in gates.items():
        referenced |= _collect_referenced_gates(g)
    roots = [g for g in defined if g not in referenced]
    return roots


def _parse_expr(node: ET.Element, gates: Dict[str, ET.Element], visiting: Set[str]) -> Dict[str, Any]:
    t = _strip(node.tag)
    # gate reference
    if t == 'gate':
        name = node.get('name')
        if not name:
            return { 'event': '' }
        # expand gate inline (DFS)
        if name in visiting:
            # cycle guard; treat as reference leaf
            return { 'event': name }
        gnode = gates.get(name)
        if gnode is None:
            return { 'event': name }
        visiting.add(name)
        # assume single formula child
        for ch in list(gnode):
            expr = _parse_expr(ch, gates, visiting)
            visiting.remove(name)
            return expr
        visiting.remove(name)
        return { 'event': name }

    # leaf events and generic <event>
    if t in ('basic-event','house-event','event'):
        name = node.get('name')
        if not name:
            # try reference child
            ref = _first_child(node, 'reference')
            if ref is not None:
                name = ref.get('name')
        etype = (node.get('type') or '').strip()
        if t == 'event' and etype == 'gate' and name:
            # expand as a gate reference
            return _parse_expr(ET.Element('gate', {'name': name}), gates, visiting)
        return { 'event': name or '' }

    # constants not mapped to house events; ignore here
    if t == 'constant':
        value = (node.get('value') or '').strip().lower()
        return { 'event': '<TRUE>' if value == 'true' else '<FALSE>' }

    # boolean ops
    if t in ('and','or','xor','nand','nor'):
        args: List[Dict[str, Any]] = []
        for ch in list(node):
            args.append(_parse_expr(ch, gates, visiting))
        if t in ('and','or') and len(args) == 1:
            return args[0]
        return { 'op': t, 'args': args }

    if t == 'not':
        chs = list(node)
        if chs:
            return { 'op': 'not', 'arg': _parse_expr(chs[0], gates, visiting) }
        return { 'op': 'not', 'arg': { 'event': '' } }

    if t == 'atleast':
        k = 0
        try:
            k = int(node.get('min') or '0')
        except Exception:
            k = 0
        args: List[Dict[str, Any]] = []
        for ch in list(node):
            args.append(_parse_expr(ch, gates, visiting))
        return { 'op': 'atleast', 'k': k, 'args': args }

    if t == 'cardinality':
        # map to atleast using min
        k = 0
        try:
            k = int(node.get('min') or '0')
        except Exception:
            k = 0
        args: List[Dict[str, Any]] = []
        for ch in list(node):
            args.append(_parse_expr(ch, gates, visiting))
        return { 'op': 'atleast', 'k': k, 'args': args }

    # pass-through for unknowns
    if list(node):
        return _parse_expr(list(node)[0], gates, visiting)
    return { 'event': '' }


def _collect_used_events(expr: Dict[str, Any], used_basic: Set[str], used_house: Set[str]):
    if not isinstance(expr, dict):
        return
    if 'event' in expr:
        nm = expr['event'] or ''
        if nm in ('<TRUE>','<FALSE>'):
            used_house.add(nm)
        elif nm:
            used_basic.add(nm)
        return
    op = expr.get('op')
    if op == 'not':
        _collect_used_events(expr.get('arg'), used_basic, used_house)
    else:
        for a in expr.get('args', []) or []:
            _collect_used_events(a, used_basic, used_house)

# ------- CCF parsing -------

def _first_float_value(el: ET.Element):
    if el is None:
        return None
    f = el.find('.//float')
    if f is not None and f.get('value') is not None:
        try:
            return float(f.get('value'))
        except Exception:
            return None
    return None


def _parse_ccf_groups(root: ET.Element) -> List[Dict[str, Any]]:
    groups: List[Dict[str, Any]] = []
    for ccfg in root.findall('.//define-CCF-group'):
        name = ccfg.get('name') or ''
        model = ccfg.get('model') or ''
        if not name or not model:
            continue
        label = _first_child(ccfg, 'label')
        description = label.text if (label is not None and label.text is not None) else None
        # members
        members: List[str] = []
        mems = _first_child(ccfg, 'members')
        if mems is not None:
            for be in mems.findall('basic-event'):
                mname = be.get('name')
                if not mname:
                    ref = _first_child(be, 'reference')
                    if ref is not None:
                        mname = ref.get('name')
                if mname:
                    members.append(mname)
        # distribution
        distribution = None
        distr_el = _first_child(ccfg, 'distribution')
        if distr_el is not None:
            distribution = _first_float_value(distr_el)
        # factors
        factors: List[Dict[str, Any]] = []
        factor_nodes: List[ET.Element] = []
        fwrap = _first_child(ccfg, 'factors')
        if fwrap is not None:
            factor_nodes.extend(fwrap.findall('factor'))
        factor_nodes.extend([n for n in list(ccfg) if _strip(n.tag) == 'factor'])
        for f in factor_nodes:
            lvl = f.get('level')
            level = None
            try:
                if lvl is not None:
                    level = int(lvl)
            except Exception:
                level = None
            val = _first_float_value(f)
            entry: Dict[str, Any] = {}
            if level is not None:
                entry['level'] = level
            if val is not None:
                entry['value'] = val
            if entry:
                factors.append(entry)
        grp: Dict[str, Any] = { 'name': name, 'model': model, 'members': members }
        if description:
            grp['description'] = description
        if distribution is not None:
            grp['distribution'] = distribution
        if factors:
            grp['factors'] = factors
        groups.append(grp)
    return groups

# ------- Model builders -------

def build_scram_node_options() -> Dict[str, Any]:
    return {
        'mocus': False,
        'bdd': True,
        'zbdd': False,
        'rareEvent': False,
        'mcub': False,
        # 'limitOrder': 0,
        # 'cutOff': 0,
        # 'missionTime': 0,
        # 'timeStep': 0,
        # 'numTrials': 0,
        # 'numQuantiles': 0,
        # 'numBins': 0,
        # 'seed': 0,
        'primeImplicants': False,
        'probability': True,
        'importance': False,
        'uncertainty': False,
        'ccf': False,
        'sil': False,
    }


def scram_node_options_ts_literal() -> str:
    return """
// ScramNodeOptions literal
const options: ScramNodeOptions = {
  mocus: false,
  bdd: true,
  zbdd: false,
  rareEvent: false,
  mcub: false,
  // limitOrder: 0,
  // cutOff: 0,
  // missionTime: 0,
  // timeStep: 0,
  // numTrials: 0,
  // numQuantiles: 0,
  // numBins: 0,
  // seed: 0,
  primeImplicants: false,
  probability: true,
  importance: false,
  uncertainty: false,
  ccf: false,
  sil: false,
};
""".strip()


def convert_xml_to_model(xml_text: str) -> Dict[str, Any]:
    root = ET.fromstring(xml_text)
    model: Dict[str, Any] = { 'faultTrees': [] }

    # Build value maps from ANY define-basic-event / define-house-event
    be_values: Dict[str, float] = {}
    for be in root.findall('.//define-basic-event'):
        nm = be.get('name')
        val = None
        fl = be.find('.//float')
        if nm and fl is not None and fl.get('value') is not None:
            try:
                val = float(fl.get('value'))
            except Exception:
                val = None
        if nm and val is not None:
            be_values[nm] = val

    house_values: Dict[str, bool] = {}
    for he in root.findall('.//define-house-event'):
        nm = he.get('name')
        if not nm:
            continue
        c = he.find('.//constant')
        if c is not None and c.get('value') is not None:
            house_values[nm] = (c.get('value').strip().lower() == 'true')

    # Fault trees
    for ft in root.findall('.//define-fault-tree'):
        ft_name = ft.get('name')
        gates = _build_gate_map(ft)
        tops = _find_top_candidates(ft, gates)

        if len(tops) == 1:
            top_node = gates.get(tops[0])
            # parse its formula
            top_expr = None
            if top_node is not None:
                for ch in list(top_node):
                    top_expr = _parse_expr(ch, gates, set())
                    break
            if top_expr is None:
                top_expr = { 'event': '' }
        else:
            # synthesize a top OR over all root gates
            args = []
            for tnm in (tops or list(gates.keys())):
                args.append(_parse_expr(ET.Element('gate', {'name': tnm}), gates, set()))
            if len(args) == 1:
                top_expr = args[0]
            else:
                top_expr = { 'op': 'or', 'args': args }

        used_basic: Set[str] = set()
        used_house_consts: Set[str] = set()
        _collect_used_events(top_expr, used_basic, used_house_consts)

        # Collect all referenced event names (basic/house) by walking again
        def collect_names(expr: Dict[str, Any], out: Set[str]):
            if not isinstance(expr, dict):
                return
            if 'event' in expr:
                nm = expr['event'] or ''
                if nm and nm not in ('<TRUE>','<FALSE>'):
                    out.add(nm)
                return
            if expr.get('op') == 'not':
                collect_names(expr.get('arg'), out)
            else:
                for a in expr.get('args', []) or []:
                    collect_names(a, out)
        used_names: Set[str] = set()
        collect_names(top_expr, used_names)

        basicEvents: List[Dict[str, Any]] = []
        for nm in sorted(used_names):
            if nm in be_values:
                basicEvents.append({ 'name': nm, 'p': be_values[nm] })
        houseEvents: List[Dict[str, Any]] = []
        for nm in sorted(used_names):
            if nm in house_values:
                houseEvents.append({ 'name': nm, 'state': bool(house_values[nm]) })
        for nm in sorted(used_house_consts):
            houseEvents.append({ 'name': nm, 'state': (nm == '<TRUE>') })

        ft_obj: Dict[str, Any] = { 'name': ft_name, 'top': top_expr }
        if basicEvents:
            ft_obj['basicEvents'] = basicEvents
        if houseEvents:
            ft_obj['houseEvents'] = houseEvents
        model['faultTrees'].append(ft_obj)

    if not model['faultTrees']:
        model['faultTrees'] = []

    # CCF groups at model level
    ccf_groups = _parse_ccf_groups(root)
    if ccf_groups:
        model['ccfGroups'] = ccf_groups

    # Set a simple model name if missing
    model['name'] = _random_model_name()

    return model


def convert_xml_to_request(xml_text: str) -> Dict[str, Any]:
    model = convert_xml_to_model(xml_text)
    settings = build_scram_node_options()
    return { 'settings': settings, 'model': model }

# # Example usage (local)
# with open('fixtures/models/aralia-fault-tree-dataset/data/openpsa/chinese.xml','r',encoding='utf-8') as f:
#     xml_text = f.read()
# model = convert_xml_to_model(xml_text)
# print(json.dumps({ 'model': model }, indent=2))



In [None]:
# Google Colab upload/convert/download
try:
    from google.colab import files  # type: ignore
    _COLAB = True
except Exception:
    _COLAB = False

if _COLAB:
    print("Upload an Open-PSA XML file...")
    uploaded = files.upload()
    if uploaded:
        filename = list(uploaded.keys())[0]
        xml_text = uploaded[filename].decode('utf-8')
        req = convert_xml_to_request(xml_text)
        output_request = json.dumps(req, indent=2)
        out_req_name = filename.rsplit('.', 1)[0] + '_request.json'
        with open(out_req_name, 'w', encoding='utf-8') as f:
            f.write(output_request)
        print(f"Converted NodeQuantRequest → {out_req_name}")
        files.download(out_req_name)
else:
    print("Not running in Colab. To use file upload, open this notebook in Google Colab.")

