# Update Interface Example from CSV 


### Model-Specific Code (Do Not Modify)

This section contains code that is specific to the system model. It is updated only when the model is changed and should not require user modifications under normal circumstances.

If a new model is introduced, ensure this section is reviewed and updated as needed.


In [1]:
#!pip install --upgrade git+https://github.com/tkSDISW/Capella_Tools 
import capellambse.decl

from capella_tools import capellambse_helper


from IPython import display as diag_display
resources = {
    "Interface_Control_Example": "Interface_Control_Example/Interface_Control_Example/Interface_Control_Example",
}
path_to_model = "../Interface_Control_Example.aird"
model = capellambse.MelodyModel(path_to_model, resources=resources)


Find interface

In [2]:
for obj in model.la.component_exchanges:  
#    print(obj)
    if obj.name == "hss_hsss_to_agss" :
        interface = obj
        print("Found:" , obj.name, "uuid:",obj.uuid)


Found: hss_hsss_to_agss uuid: 304dad2a-1d26-46f3-a8d4-b3ae841c571b


Locate the the .csv

In [3]:
from __future__ import annotations
import os
import sys
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional, Dict, List, Tuple

import pandas as pd


# -------- Logging -------------------------------------------------------------

def setup_interface_logger(level: int = logging.INFO) -> logging.Logger:
    """
    Configure and return a logger for the interface loader.
    Call once per notebook (you can re-call to change level).
    """
    logger = logging.getLogger("interfaces")
    logger.setLevel(level)
    # Avoid duplicate handlers when re-running cells
    if not any(isinstance(h, logging.StreamHandler) for h in logger.handlers):
        h = logging.StreamHandler(stream=sys.stdout)
        fmt = logging.Formatter("[%(levelname)s] %(message)s")
        h.setFormatter(fmt)
        logger.addHandler(h)
    # Also quiet down pandas a bit when in DEBUG
    logging.getLogger("pandas").setLevel(logging.WARNING)
    return logger


log = setup_interface_logger(logging.INFO)  # default; set to DEBUG during troubleshooting


# -------- Directory search ----------------------------------------------------

def find_first_matching_dir(
    root: str | Path,
    name_parts: Iterable[str],
    *,
    recurse: bool = False,
    case_insensitive: bool = True,
    logger: logging.Logger = log,
) -> Path:
    """
    Return the first directory under `root` whose name contains any of the given
    `name_parts` substrings (checked in the order provided). If `recurse` is
    False, only scans immediate children; otherwise uses os.walk.
    Raises FileNotFoundError if nothing matches.
    """
    root = Path(root).expanduser().resolve()
    logger.info(f"Searching under root: {root}")
    if not root.is_dir():
        raise FileNotFoundError(f"Root path not found or not a directory: {root}")

    parts = list(name_parts)
    if not parts:
        raise ValueError("Provide at least one name substring in `name_parts`.")

    def norm(s: str) -> str:
        return s.lower() if case_insensitive else s

    if not recurse:
        subdirs = [p for p in root.iterdir() if p.is_dir()]
    else:
        subdirs = []
        for dirpath, dirnames, _ in os.walk(root):
            for d in dirnames:
                subdirs.append(Path(dirpath) / d)

    logger.debug(f"Found {len(subdirs)} subdirectories to scan (recurse={recurse}).")

    # Respect caller's order of name_parts: first part with any match wins
    for part in parts:
        npart = norm(part)
        logger.info(f"Trying name part: '{part}'")
        matches = [d for d in subdirs if npart in norm(d.name)]
        logger.debug(f"Matches for '{part}': {[m.name for m in matches]}")
        if matches:
            chosen = matches[0]
            logger.info(f"Selected directory: {chosen}")
            return chosen

    tried = ", ".join(parts)
    raise FileNotFoundError(
        f"No directory under {root} matched any of: {tried}\n"
        f"Scanned {'recursively' if recurse else 'top-level'} {len(subdirs)} dirs."
    )


# -------- CSV loader objects --------------------------------------------------

@dataclass
class InterfaceCSV:
    """Typed wrapper over a CSV with columns:
       source_index, title, section, property_name, property_value
    """
    path: Path
    df: pd.DataFrame
    logger: logging.Logger = log

    @classmethod
    def load(cls, path: str | Path, *, logger: logging.Logger = log) -> "InterfaceCSV":
        p = Path(path)
        logger.info(f"Loading CSV: {p}")
        if not p.is_file():
            raise FileNotFoundError(f"CSV not found: {p}")

        try:
            df = pd.read_csv(p)
        except Exception as e:
            logger.error(f"Failed to read CSV '{p}': {e}")
            raise

        logger.info(f"Loaded {len(df)} rows, columns: {list(df.columns)}")

        # Normalize column names we rely on
        expected = ("title", "section", "property_name", "property_value")
        for col in expected:
            if col not in df.columns:
                logger.warning(f"Column '{col}' missing in {p.name}; creating with None")
                df[col] = None

        # Required columns check
        required = {"property_name", "property_value"}
        missing = required - set(map(str, df.columns))
        if missing:
            msg = f"{p.name} missing required columns: {sorted(missing)}"
            logger.error(msg)
            raise ValueError(msg)

        # Basic cleanups & dtype expectations
        df["property_name"] = df["property_name"].astype(str)
        # Keep property_value as object; caller may cast later as needed

        ic = cls(path=p, df=df, logger=logger)
        ic._log_quick_peek()
        problems = ic.validate(raise_on_error=False)
        if problems:
            logger.warning("Validation warnings:\n  - " + "\n  - ".join(problems))
        return ic

    # ---- Diagnostics

    def _log_quick_peek(self, n: int = 5) -> None:
        try:
            self.logger.debug("Quick peek (head):")
            self.logger.debug("\n" + str(self.df.head(n)))
            self.logger.debug("Unique sections: " + str(self.df["section"].dropna().unique().tolist()))
            self.logger.debug("Unique titles (first 3): " + str(self.df["title"].dropna().unique().tolist()[:3]))
        except Exception as e:
            self.logger.debug(f"Quick peek skipped: {e}")

    def debug_summary(self, n_titles: int = 5) -> Dict[str, object]:
        """Return a summary dict you can print or inspect in a cell."""
        return {
            "path": str(self.path),
            "rows": len(self.df),
            "columns": list(self.df.columns),
            "null_counts": self.df.isna().sum().to_dict(),
            "section_counts": self.df["section"].value_counts(dropna=False).to_dict(),
            "sample_titles": self.df["title"].dropna().unique().tolist()[:n_titles],
            "sample_props": self.df["property_name"].dropna().unique().tolist()[:10],
        }

    def validate(self, *, raise_on_error: bool = True) -> List[str]:
        """Light validation; returns list of problems and optionally raises."""
        problems: List[str] = []
        # Empty?
        if self.df.empty:
            problems.append("CSV has zero rows.")

        # property_name must exist and be non-empty strings
        if self.df["property_name"].isna().any():
            problems.append("Some rows have NaN in 'property_name'.")

        # Duplicates per (title, property_name, section) can be suspicious
        if all(c in self.df.columns for c in ("title", "section", "property_name")):
            dup_mask = self.df.duplicated(subset=["title", "section", "property_name"], keep=False)
            dup_count = int(dup_mask.sum())
            if dup_count:
                problems.append(f"Found {dup_count} duplicate rows by (title, section, property_name).")

        # Warn if many missing values
        if self.df["property_value"].isna().mean() > 0.25:
            ratio = round(float(self.df["property_value"].isna().mean()), 3)
            problems.append(f"High NaN ratio in 'property_value': {ratio}")

        if raise_on_error and problems:
            raise ValueError("Validation errors:\n  - " + "\n  - ".join(problems))
        return problems

    # ---- Convenience queries

    def property_names(self, *, section: Optional[str] = None, title: Optional[str] = None) -> List[str]:
        q = self.df
        if section is not None:
            q = q[q["section"] == section]
        if title is not None:
            q = q[q["title"] == title]
        names = sorted(q["property_name"].dropna().unique().tolist())
        self.logger.debug(f"property_names(section={section}, title={title}) -> {names}")
        return names

    def properties_for_title(
        self,
        title: str,
        *,
        section: Optional[str] = None,
        drop_na: bool = True,
    ) -> Dict[str, object]:
        q = self.df[self.df["title"] == title]
        if section is not None:
            q = q[q["section"] == section]
        if drop_na:
            q = q[~q["property_value"].isna()]
        result = dict(zip(q["property_name"], q["property_value"]))
        self.logger.debug(f"properties_for_title(title={title!r}, section={section}) -> {len(result)} items")
        return result

    from typing import List, Tuple, Optional
    import pandas as pd
    
    def values_by_property(
        self,
        prop_name: str,
        *,
        section: Optional[str] = None,
        null_str: str = "",
    ) -> List[Tuple[str, str]]:
        q = self.df.loc[self.df["property_name"] == prop_name].copy()
        if section is not None:
            q = q.loc[q["section"] == section]
    
        def to_str_or_null(x):
            # Treat real nulls and common "nullish" text as null_str
            if pd.isna(x):
                return null_str
            if isinstance(x, str) and x.strip().lower() in {"", "nan", "<na>", "none", "null"}:
                return null_str
            return str(x)
    
        titles = q["title"].map(to_str_or_null)
        values = q["property_value"].map(to_str_or_null)
    
        pairs = list(zip(titles.tolist(), values.tolist()))
        self.logger.debug(
            f"values_by_property({prop_name!r}, section={section}) -> {len(pairs)} rows"
        )
        return pairs


# -------- Orchestrator --------------------------------------------------------

@dataclass
class InterfacesBundle:
    base_dir: Path
    process: InterfaceCSV
    structural: InterfaceCSV


def load_interfaces_from_first_match(
    root: str | Path,
    dir_name_parts: Iterable[str],
    *,
    recurse: bool = False,
    case_insensitive: bool = True,
    process_filenames: Iterable[str] = ("process_interface_items.csv", "process_interface.csv"),
    structural_filenames: Iterable[str] = ("structural_interface_items.csv", "structural_interface.csv"),
    dry_run: bool = False,
    logger: logging.Logger = log,
) -> InterfacesBundle:
    """
    1) Find first directory under `root` whose name contains any `dir_name_parts`.
    2) Load the first existing CSV for each category (process/structural) using
       the provided filename candidates (in order).
    3) Return typed wrappers with convenience query methods.
    """
    base = find_first_matching_dir(
        root, dir_name_parts, recurse=recurse, case_insensitive=case_insensitive, logger=logger
    )

    def resolve_one(kind: str, candidates: Iterable[str]) -> Path:
        logger.info(f"Looking for {kind} CSV among candidates: {list(candidates)}")
        for name in candidates:
            p = base / name
            logger.debug(f"Check: {p}")
            if p.is_file():
                logger.info(f"{kind} CSV selected: {p}")
                return p
        raise FileNotFoundError(
            f"{kind}: none of the candidate files found in {base}:\n  - " + "\n  - ".join(map(str, candidates))
        )

    process_path = resolve_one("process", process_filenames)
    structural_path = resolve_one("structural", structural_filenames)

    if dry_run:
        logger.info(f"[dry_run=True] Would load:\n  process:    {process_path}\n  structural: {structural_path}")
        # Load anyway to allow validation previews, but caller knows it's a dry run
    process = InterfaceCSV.load(process_path, logger=logger)
    structural = InterfaceCSV.load(structural_path, logger=logger)

    logger.info("Bundle loaded successfully.")
    return InterfacesBundle(base_dir=base, process=process, structural=structural)

# Turn on detailed logging while you debug:
setup_interface_logger(logging.DEBUG)

bundle = load_interfaces_from_first_match(
    root="per_source_interface_items",
    dir_name_parts=[interface.name],  # tried in order
    recurse=True,
    dry_run=False,  # set True to preview discovery without relying on results downstream
)

# Quick summaries (print or just evaluate in a notebook cell)
bundle.process.debug_summary()
bundle.structural.debug_summary()

# Force validation errors to raise (useful in tests)
bundle.structural.validate(raise_on_error=True)



[INFO] Searching under root: /home/simcenter/studio/Interface_Control_Example/Interface_Control_Example/Notebooks/per_source_interface_items
[DEBUG] Found 7 subdirectories to scan (recurse=True).
[INFO] Trying name part: 'hss_hsss_to_agss'
[DEBUG] Matches for 'hss_hsss_to_agss': ['0000_hss_hsss_to_agss_high_pressure_helium_makeup_for_agss_charge_discharge_buffers']
[INFO] Selected directory: /home/simcenter/studio/Interface_Control_Example/Interface_Control_Example/Notebooks/per_source_interface_items/0000_hss_hsss_to_agss_high_pressure_helium_makeup_for_agss_charge_discharge_buffers
[INFO] Looking for process CSV among candidates: ['process_interface_items.csv', 'process_interface.csv']
[DEBUG] Check: /home/simcenter/studio/Interface_Control_Example/Interface_Control_Example/Notebooks/per_source_interface_items/0000_hss_hsss_to_agss_high_pressure_helium_makeup_for_agss_charge_discharge_buffers/process_interface_items.csv
[INFO] process CSV selected: /home/simcenter/studio/Interface_Co

[]

In [4]:
from __future__ import annotations
import io
from jinja2 import Environment, BaseLoader

def _q(s: str | None) -> str:
    """Minimal YAML-safe quoting."""
    if s is None: return '""'
    s = str(s).replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
    return f'"{s}"'

_PVG_APVG_TMPL = """\
- parent: !uuid {{ parent_uuid }}
  extend:
    property_value_groups:
      - name: {{ pvg_name | q }}
        promise_id: pvg_id
{% if pvg_desc %}
        description: {{ pvg_desc | q }}
{% endif %}
    applied_property_value_groups:
      - !promise pvg_id
"""

_env = Environment(loader=BaseLoader(), trim_blocks=True)   # <-- no lstrip_blocks
_env.filters["q"] = _q
_tmpl = _env.from_string(_PVG_APVG_TMPL)

def apply_pvg_decl_simple(model, parent_uuid: str, pvg_name: str, pvg_desc: str = "") -> str:
    """Render+apply declarative YAML that creates a PVG and attaches it via APVG."""
    from capellambse import decl
    yml = _tmpl.render(parent_uuid=parent_uuid, pvg_name=pvg_name, pvg_desc=pvg_desc)
    decl.apply(model, io.StringIO(yml))
    return yml



import capellambse, inspect
#print(interface)
for title, group in bundle.structural.df.groupby("title"):
    
    yml_used = apply_pvg_decl_simple(
        model,
        parent_uuid=interface.uuid,
        pvg_name="structural",
        pvg_desc="stuctural interface",
    )
    print(yml_used)  # optional: see exactly what was applied  
    
    for pvg in interface.property_value_groups :
        if pvg.name == "structural":
            update_pvg = pvg
        

    
    print(f"\n=== {title} ===")
    for _, row in group.iterrows():
        print(f"{row['property_name']}: {row['property_value']}")
        val  = row["property_value"]
        if pd.isna(val):
            val = ""
        else:
            val = str(val)
        spv = update_pvg.property_values.create('StringPropertyValue',
                                       name=row['property_name'],
                                       value=val)      
        #print(update_pvg)



   
for title, group in bundle.process.df.groupby("title"):
    yml_used = apply_pvg_decl_simple(
        model,
        parent_uuid=interface.uuid,
        pvg_name="process",
        pvg_desc="process",
    )
    print(yml_used)  # optional: see exactly what was applied    

    for pvg in interface.property_value_groups :
        if pvg.name == "process":
            update_pvg = pvg

    
    print(f"\n=== {title} ===")
    for _, row in group.iterrows():
        print(f"{row['property_name']}: {row['property_value']}")
        if pd.isna(val):
            val = ""
        else:
            val = str(val)
        spv = update_pvg.property_values.create('StringPropertyValue',
                                       name=row['property_name'],
                                       value=val)      
        print(update_pvg)

- parent: !uuid 304dad2a-1d26-46f3-a8d4-b3ae841c571b
  extend:
    property_value_groups:
      - name: "structural"
        promise_id: pvg_id
        description: "stuctural interface"
    applied_property_value_groups:
      - !promise pvg_id

=== HSS-HSSS to AGSS
High Pressure Helium Makeup for AGSS Charge/Discharge buffers ===
Internal_or_External: Internal
Subcategory: Pipe Flange
Weight: Pipe/Valve not modeled
Size: Pipe/Valve not modeled
Location: FHS Floor 4
- parent: !uuid 304dad2a-1d26-46f3-a8d4-b3ae841c571b
  extend:
    property_value_groups:
      - name: "process"
        promise_id: pvg_id
        description: "process"
    applied_property_value_groups:
      - !promise pvg_id

=== HSS-HSSS to AGSS
High Pressure Helium Makeup for AGSS Charge/Discharge buffers ===
Interface_Diameter: DN25
<PropertyValueGroup 'process' (b494bfc6-6beb-4efd-ad15-5f3cfeacb9f3)>
.applied_property_value_groups = []
.applied_property_values = []
.constraints = []
.description = Markup('process

In [5]:
model.save()
print("Done")

Done


# 