In [1]:
import re
import time
import contextlib
import pythoncom
import win32com.client
from pythoncom import com_error

ST_LANG = 6  # TwinCAT ST

METHOD_HDR = re.compile(r"(?m)^\s*//\s*METHOD\s+(?P<name>\w+)\b.*$")

def clean_st_text(text: str) -> str:
    if not text:
        return ""
    text = text.lstrip("\ufeff").replace("\r\n", "\n").replace("\r", "\n")
    # Export-Header aus deinem KG entfernen
    text = re.sub(r"(?m)^\s*//\s*POU\s+.*?\s+body\s*\n", "", text)
    return text.strip() + "\n"

def split_blob_into_body_and_methods(st_blob: str):
    st_blob = clean_st_text(st_blob)
    headers = list(METHOD_HDR.finditer(st_blob))
    if not headers:
        return st_blob, {}

    body = st_blob[:headers[0].start()].strip() + "\n"
    methods = {}
    for i, h in enumerate(headers):
        name = h.group("name")
        start = h.end()
        end = headers[i + 1].start() if i + 1 < len(headers) else len(st_blob)
        methods[name] = clean_st_text(st_blob[start:end])
    return body, methods

RPC_E_CALL_REJECTED = -2147418111

def com_retry(fn, *args, retries: int = 60, delay: float = 0.25, **kwargs):
    last = None
    for _ in range(retries):
        try:
            return fn(*args, **kwargs)
        except com_error as e:
            last = e
            if getattr(e, "hresult", None) == RPC_E_CALL_REJECTED:
                time.sleep(delay)
                pythoncom.PumpWaitingMessages()
                continue
            raise
    raise last

# --- OLE Message Filter (sehr wichtig bei DTE) ---
# Manche pywin32-Versionen haben pythoncom.IID_IOleMessageFilter nicht.
# Dann definieren wir die IID manuell:
IID_IOleMessageFilter = getattr(pythoncom, "IID_IOleMessageFilter", None)
if IID_IOleMessageFilter is None:
    IID_IOleMessageFilter = pythoncom.MakeIID("{00000016-0000-0000-C000-000000000046}")

class OleMessageFilter:
    _public_methods_ = ["HandleInComingCall", "RetryRejectedCall", "MessagePending"]
    _com_interfaces_ = [IID_IOleMessageFilter]

    def HandleInComingCall(self, dwCallType, hTaskCaller, dwTickCount, lpInterfaceInfo):
        return 0  # SERVERCALL_ISHANDLED

    def RetryRejectedCall(self, hTaskCallee, dwTickCount, dwRejectType):
        # 2 = SERVERCALL_RETRYLATER
        if dwRejectType == 2:
            return 150  # retry after 150 ms
        return -1      # cancel

    def MessagePending(self, hTaskCallee, dwTickCount, dwPendingType):
        return 2  # PENDINGMSG_WAITDEFPROCESS

@contextlib.contextmanager
def ole_message_filter():
    """
    Manche pywin32 Builds haben pythoncom.CoRegisterMessageFilter nicht.
    In dem Fall machen wir einfach nichts (No-Op) und verlassen uns auf com_retry().
    """
    if not hasattr(pythoncom, "CoRegisterMessageFilter"):
        # Kein Register verfügbar -> No-Op
        yield
        return

    old = None
    try:
        old = pythoncom.CoRegisterMessageFilter(OleMessageFilter())
        yield
    finally:
        try:
            pythoncom.CoRegisterMessageFilter(old)
        except Exception:
            pass


  IID_IOleMessageFilter = pythoncom.MakeIID("{00000016-0000-0000-C000-000000000046}")


In [2]:
VS_PROJECT_KIND_SOLUTION_FOLDER = "{66A26720-8FB5-11D2-AA7E-00C04F688DDE}"

def create_dte():
    prog_ids = [
        "TcXaeShell.DTE.17.0", "VisualStudio.DTE.17.0",
        "TcXaeShell.DTE.16.0", "VisualStudio.DTE.16.0",
        "TcXaeShell.DTE.15.0", "VisualStudio.DTE.15.0",
        "VisualStudio.DTE.14.0", "VisualStudio.DTE.12.0", "VisualStudio.DTE.10.0",
    ]
    last = None
    for pid in prog_ids:
        try:
            # DispatchEx erzeugt meist eine frische Instanz -> weniger Konflikte
            try:
                dte = win32com.client.DispatchEx(pid)
            except Exception:
                dte = win32com.client.gencache.EnsureDispatch(pid)

            dte.SuppressUI = True
            try:
                dte.MainWindow.Visible = False
            except Exception:
                pass

            try:
                settings = dte.GetObject("TcAutomationSettings")
                settings.SilentMode = True
            except Exception:
                pass

            return dte
        except Exception as e:
            last = e
    raise RuntimeError(f"Kein passender DTE ProgID gefunden. Letzter Fehler: {last}")

def _safe_get_project_path(proj):
    for attr in ("FullName", "FileName", "UniqueName"):
        try:
            v = com_retry(getattr, proj, attr)
            if v:
                return str(v)
        except Exception:
            pass
    return None

def _iter_projects(solution):
    def walk(proj):
        try:
            kind = str(com_retry(getattr, proj, "Kind")).upper()
        except Exception:
            kind = None

        if kind == VS_PROJECT_KIND_SOLUTION_FOLDER:
            try:
                items = com_retry(getattr, proj, "ProjectItems")
                cnt = com_retry(getattr, items, "Count")
                for i in range(1, int(cnt) + 1):
                    try:
                        it = com_retry(items.Item, i)
                        sub = com_retry(getattr, it, "SubProject")
                        if sub is not None:
                            yield from walk(sub)
                    except Exception:
                        continue
            except Exception:
                return
        else:
            yield proj

    projects = com_retry(getattr, solution, "Projects")
    cnt = com_retry(getattr, projects, "Count")
    for i in range(1, int(cnt) + 1):
        try:
            p = com_retry(projects.Item, i)
            yield from walk(p)
        except Exception:
            continue

def open_solution_get_sysman(solution_path: str):
    with ole_message_filter():
        dte = create_dte()
        com_retry(dte.Solution.Open, solution_path)

        # kurze “Warmup”-Phase: DTE ist nach Open oft noch busy
        time.sleep(1.0)
        pythoncom.PumpWaitingMessages()

        tc_project = None
        for p in _iter_projects(dte.Solution):
            ppath = _safe_get_project_path(p)
            if ppath and ppath.lower().endswith(".tsproj"):
                tc_project = p
                break

        if tc_project is None:
            seen = []
            for p in _iter_projects(dte.Solution):
                seen.append((getattr(p, "Name", "?"), _safe_get_project_path(p), getattr(p, "Kind", None)))
            raise RuntimeError("Kein TwinCAT .tsproj gefunden. DTE sieht: " + repr(seen[:30]))

        sysman = com_retry(getattr, tc_project, "Object")
        return dte, sysman


In [3]:
def enum_children(tree_item):
    try:
        return list(tree_item)
    except Exception:
        cnt = int(com_retry(getattr, tree_item, "ChildCount"))
        return [com_retry(tree_item.Child, i) for i in range(1, cnt + 1)]

def find_first_plc_nested_project(sysman, prefer_name: str = None):
    root = com_retry(sysman.LookupTreeItem, "TIPC")
    candidates = []
    for child in enum_children(root):
        try:
            plc = com_retry(getattr, child, "NestedProject")
            candidates.append(plc)
        except Exception:
            continue

    if not candidates:
        raise RuntimeError("Kein PLC Projekt unter TIPC gefunden (NestedProject klappt nirgends).")

    if prefer_name:
        for plc in candidates:
            if prefer_name.lower() in (getattr(plc, "Name", "") or "").lower():
                return plc

    return candidates[0]

def get_pous_tree_path(sysman, prefer_plc_project_name: str = None):
    plc = find_first_plc_nested_project(sysman, prefer_name=prefer_plc_project_name)
    pous_path = f"{plc.PathName}^POUs"
    pous_item = com_retry(sysman.LookupTreeItem, pous_path)
    return pous_item, pous_path


In [4]:
def cast_to(obj, iface: str):
    try:
        return win32com.client.CastTo(obj, iface)
    except Exception:
        return obj

def _delete_child_safe(parent_item, child_name: str):
    try:
        com_retry(parent_item.DeleteChild, child_name)
        return True
    except Exception:
        return False

def _recreate_fb_as_st(pous_item, fb_name: str):
    _delete_child_safe(pous_item, fb_name)
    return com_retry(pous_item.CreateChild, fb_name, 604, "", ST_LANG)

def _get_impl_lang(tree_item):
    pou = cast_to(tree_item, "ITcPlcPou")
    impl = cast_to(pou, "ITcPlcImplementation")
    try:
        return int(com_retry(getattr, impl, "Language"))
    except Exception:
        return None

def _set_decl_impl(tree_item, decl_text: str, impl_text: str):
    pou = cast_to(tree_item, "ITcPlcPou")
    decl = cast_to(pou, "ITcPlcDeclaration")
    impl = cast_to(pou, "ITcPlcImplementation")

    # via Interfaces schreiben (robuster)
    com_retry(setattr, decl, "DeclarationText", decl_text)
    com_retry(setattr, impl, "ImplementationText", impl_text)

def upsert_job_fb_and_save(
    solution_path: str,
    fb_name: str,
    fb_decl: str,
    st_blob: str,
    pous_tree_path: str = None,
    prefer_plc_project_name: str = None,
):
    with ole_message_filter():
        dte, sysman = open_solution_get_sysman(solution_path)

        try:
            if pous_tree_path is None:
                pous_item, pous_tree_path = get_pous_tree_path(sysman, prefer_plc_project_name=prefer_plc_project_name)
            else:
                pous_item = com_retry(sysman.LookupTreeItem, pous_tree_path)

            fb_path = f"{pous_tree_path}^{fb_name}"

            # FB holen/erstellen
            try:
                fb_item = com_retry(sysman.LookupTreeItem, fb_path)
            except Exception:
                fb_item = com_retry(pous_item.CreateChild, fb_name, 604, "", ST_LANG)

            fb_body_impl, methods = split_blob_into_body_and_methods(st_blob)

            # ST erzwingen
            lang = _get_impl_lang(fb_item)
            if lang is not None and lang != ST_LANG:
                fb_item = _recreate_fb_as_st(pous_item, fb_name)

            # Schreiben (bei XML Fehler neu erstellen)
            try:
                _set_decl_impl(fb_item, fb_decl, fb_body_impl)
            except com_error as e:
                msg = str(e)
                if "System.Xml" in msg or "Ungültige Daten auf Stammebene" in msg:
                    fb_item = _recreate_fb_as_st(pous_item, fb_name)
                    _set_decl_impl(fb_item, fb_decl, fb_body_impl)
                else:
                    raise

            # Methoden anlegen
            for mname, impl_text in methods.items():
                m_path = f"{fb_path}^{mname}"
                try:
                    m_item = com_retry(sysman.LookupTreeItem, m_path)
                except Exception:
                    m_item = com_retry(fb_item.CreateChild, mname, 609, "", ST_LANG)

                _set_decl_impl(m_item, "", impl_text)

            # speichern
            com_retry(dte.ExecuteCommand, "File.SaveAll")

            return {"pous_tree_path": pous_tree_path, "fb_path": fb_path}

        finally:
            try:
                com_retry(dte.Quit)
            except Exception:
                pass


In [5]:
SOLUTION = r"C:\Users\Alexander Verkhov\OneDrive\Dokumente\MPA\TestProjektTwinCATEvents\TestProjektTwinCATEvents.sln"
FB_NAME = "VSG_AS_VerticalMoveEncoder"

ST_BLOB = """// POU VSG_AS_VerticalMoveEncoder body
HRL_RGB_VerticalMoveWithEncoder(
    MethodCall := MethodCall_VerticalMove,
    EmergencyStopSignal := ESS
);
DO_MovingVerticalVSG_01 := HRL_RGB_VerticalMoveWithEncoder.DigitalOutputControl_01;

// METHOD Abort of VSG_AS_VerticalMoveEncoder
callCounterAbort := callCounterAbort + 1;

// METHOD CheckState of VSG_AS_VerticalMoveEncoder
callCounterCheckState := callCounterCheckState + 1;

// METHOD Start of VSG_AS_VerticalMoveEncoder
callCounterStart := callCounterStart + 1;
"""

FB_DECL = f"""FUNCTION_BLOCK {FB_NAME}
VAR
    (* TODO: Variablen *)
END_VAR
"""

result = upsert_job_fb_and_save(
    solution_path=SOLUTION,
    fb_name=FB_NAME,
    fb_decl=FB_DECL,
    st_blob=ST_BLOB,
    pous_tree_path=None,
    prefer_plc_project_name=None
)

print("Fertig. POUs Pfad:", result["pous_tree_path"])
print("FB Pfad:", result["fb_path"])


AttributeError: Property 'LookupTreeItem.DeclarationText' can not be set.