In [1]:
import pandas as pd
import os
import datetime

In [28]:
# Graph Templates
def create_pd3_template(lld_templates: str, gpm_templates: str, lld_edges: str, lld_gpm_edges: str) -> str:
    PD3_Templeate = f"""
classDiagram
    direction LR

    %% LLD Actions\n{lld_templates}
    %% GPM Actions\n{gpm_templates}
    %% LLD Edges\n{lld_edges}
    %% LLD-GPM Edges\n{lld_gpm_edges}
    """
    return PD3_Templeate

def create_GPM_templates(GPM_IDs: list[int], GPM_inputs: list[str], GPM_names: list[str], GPM_outputs: list[str]) -> str:
    GPMs = ""
    for row in range(len(GPM_names)):
        GPMs += f"""
        class GPM_{GPM_IDs[row]}["GPM_{GPM_IDs[row]}: {GPM_names[row]}"] {{
            Input: {GPM_inputs[row]}
            Output: {GPM_outputs[row]}
        }}
        \n"""
    return GPMs

def create_LLD_templates(LLD_IDs: list[int], LLD_inputs: list[str], LLD_names: list[str], LLD_outputs: list[str]) -> str:
    LLDs = ""
    for row in range(len(LLD_names)):
        LLDs += f"""
        class LLD_{LLD_IDs[row]}["LLD_{LLD_IDs[row]}: {LLD_names[row]}"] {{
            Input: {LLD_inputs[row]}
            Output: {LLD_outputs[row]}
        }}
        \n"""
    return LLDs

def create_LLD_edges(LLD_IDs: list[int]) -> str:
    LLD_edges = ""
    for row in range(1, len(LLD_IDs)):
        LLD_edges += f"""LLD_{LLD_IDs[row-1]} --> LLD_{LLD_IDs[row]}\n"""
    return LLD_edges

def create_LLD_GPM_edges(LLD_IDs: list[int], LLD_GPM_edges: list[int]) -> str:
    generalization_edges = ""
    for row in range(len(LLD_GPM_edges)):
        generalization_edges += f"""LLD_{LLD_IDs[row]} <|.. GPM_{LLD_GPM_edges[row]} : Implements\n"""
    return generalization_edges


In [29]:
# PartOf Relationship Template
def create_GPM_partof_diagram(gpm_templates: str, gpm_edges: str) -> str:
    PD3_Templeate = f"""
classDiagram
    direction BT

    %% GPM Actions\n{gpm_templates}
    %% GPM Edges\n{gpm_edges}
    """
    return PD3_Templeate

def create_GPM_edges(GPM_IDs: list[str], gpm_partOfs: list[str]) -> str:
    edges_str = ""
    for row in range(len(gpm_partOfs)):
        edges_str += f"GPM_{GPM_IDs[row]} *-- GPM_{gpm_partOfs[row]}\n"
    return edges_str

In [30]:
# Which LLD actions GPM actions derive from
def create_LLD_templates_with_Log(LLD_logs: list[int], LLD_IDs: list[int], LLD_inputs: list[str], LLD_names: list[str], LLD_outputs: list[str]) -> str:
    LLDs = ""
    for row in range(len(LLD_names)):
        LLDs += f"""
        class LLD_{LLD_IDs[row]}["LLD_{LLD_IDs[row]}: {LLD_names[row]}"] {{
            Log: {LLD_logs[row]}
            Input: {LLD_inputs[row]}
            Output: {LLD_outputs[row]}
        }}
        \n"""
    return LLDs

def create_GPM_reference_template(lld_templates: str, gpm_templates: str, lld_gpm_edges: str) -> str:
    PD3_Templeate = f"""
classDiagram
    direction LR
    %% GPM Actions\n{gpm_templates}
    %% LLD Actions\n{lld_templates}
    %% LLD-GPM Edges\n{lld_gpm_edges}
    """
    return PD3_Templeate

def GPM_LLD_references(df_pd3, df_GPM, LLD_Logs = "Log", LLD_IDs = "Action ID", LLD_Inputs="Input", LLD_Names="Action", LLD_Outputs="Output", GPM_IDs="ClassID", GPM_Inputs="ClassInput", GPM_Names="ClassName", GPM_Outputs="ClassOutput", LLD_GPM_Edges="ClassID"):
    """
    Args: 
        Information on the correspondence of GPM actions to LLD actions.
    Returns:
        A template of each GPM action with its LLD action references.
    """
    LLD_templates = create_LLD_templates_with_Log(df_pd3[LLD_Logs].tolist(), df_pd3[LLD_IDs].tolist(), df_pd3[LLD_Inputs].tolist(), df_pd3[LLD_Names].tolist(), df_pd3[LLD_Outputs].tolist())
    GPM_templates = create_GPM_templates(df_GPM[GPM_IDs].tolist(), df_GPM[GPM_Inputs].tolist(), df_GPM[GPM_Names].tolist(), df_GPM[GPM_Outputs].tolist())
    LLD_GPM_edges = create_LLD_GPM_edges(df_pd3[LLD_IDs].tolist(), df_pd3[LLD_GPM_Edges].tolist())
    gpm_lld_references = create_GPM_reference_template(LLD_templates, GPM_templates, LLD_GPM_edges)
    return gpm_lld_references

In [None]:
# Switch LLD actions into GPM actions
def switch_lld_to_gpm(gpm_templates, gpm_edges) -> str:
    """
    Switches LLD actions into GPM actions
    Args:
        gpm_templates(str): GPM templates
        gpm_edges(str): GPM edges
    """
    GPM_DIAGRAM_TEMPLATE = f"""
classDiagram
    direction LR
    %% GPM Actions\n{gpm_templates}
    %% GPM Edges\n{gpm_edges}
    """
    return GPM_DIAGRAM_TEMPLATE

def create_GPM_diagram_edges(GPM_IDs: list[(int, int)]) -> str:
    GPM_edges = ""
    for (from_gpm, to_gpm) in GPM_IDs:
        GPM_edges += f"""GPM_{from_gpm} --> GPM_{to_gpm}\n"""
    return GPM_edges

def create_GPM_diagram_edges_with_counter(GPM_edges: list[((int, int), (int, int))]) -> str:
    GPM_edges_str = ""
    for ((from_gpm, from_gpm_frequency), (to_gpm, to_gpm_frequency)) in GPM_edges:
        GPM_edges_str += f"""GPM_{from_gpm}_{from_gpm_frequency} --> GPM_{to_gpm}_{to_gpm_frequency}\n"""
    return GPM_edges_str

def create_GPM_templates_for_logs(GPM_IDs: list[int], GPM_inputs: list[str], GPM_names: list[str], GPM_outputs: list[str]) -> str:
    GPMs = ""
    for row in range(len(GPM_names)):
        GPMs += f"""
        class GPM_{GPM_IDs[row]}["GPM_{GPM_IDs[row]}: {GPM_names[row]}"] {{
            Input: {GPM_inputs[row]}
            Output: {GPM_outputs[row]}
        }}
        \n"""
    return GPMs

def create_GPM_templates_with_counter(df_GPM: pd.DataFrame, GPM_edges: list[((int, int), (int, int))], GPM_IDs="ClassID", GPM_Inputs="ClassInput", GPM_Names="ClassName", GPM_Outputs="ClassOutput") -> str:
    GPMs = ""
    gpm_id_freq_set = set()
    for ((from_gpm, from_gpm_frequency), (to_gpm, to_gpm_frequency)) in GPM_edges:
        gpm_id_freq_set.add((from_gpm, from_gpm_frequency))
        gpm_id_freq_set.add((to_gpm, to_gpm_frequency))
    for (gpm_id, gpm_frequency) in gpm_id_freq_set:
        GPMs += f"""
        class GPM_{gpm_id}_{gpm_frequency}["GPM_{gpm_id}: {df_GPM[df_GPM[GPM_IDs]==gpm_id][GPM_Names].values[0]}"] {{
            Input: {df_GPM[df_GPM[GPM_IDs] == gpm_id][GPM_Inputs].values[0]}
            Output: {df_GPM[df_GPM[GPM_IDs] == gpm_id][GPM_Outputs].values[0]}
        }}
        \n"""
    return GPMs

def create_GPM_diagram(df_pd3: pd.DataFrame, df_gpm: pd.DataFrame, LLD_IDs = "Action ID", GPM_IDs="ClassID", GPM_Inputs="ClassInput", GPM_Names="ClassName", GPM_Outputs="ClassOutput") -> str:
    """
    Creates a GPM diagram from the given dataframes
    Args:
        df_pd3(pd.DataFrame): PD3 dataframe
        df_gpm(pd.DataFrame): GPM dataframe
    Returns:
        str: GPM diagram
    """
    from collections import Counter
    # Get the flow(edges) of LLD actions for each Log
    LLD_edges: list[(int, int)] = [(row[LLD_IDs], df_pd3.iloc[index+1][LLD_IDs]) for index, row in df_pd3.iterrows() if index < len(df_pd3)-1]
    
    # Switch LLD actions into GPM actions Action ID -> ClassID
    ## (ID, Frequency)
    GPM_ID_frequency_count = Counter()
    GPM_edges: list[((int, int), (int, int))] = []
    for (from_LLD, to_LLD) in LLD_edges:
        from_GPM = df_pd3[df_pd3[LLD_IDs] == from_LLD][GPM_IDs].values[0]
        from_freq = GPM_ID_frequency_count.get(from_GPM, 0)
        to_GPM = df_pd3[df_pd3[LLD_IDs] == to_LLD][GPM_IDs].values[0]
        to_freq = GPM_ID_frequency_count.get(to_GPM, 0)
        GPM_edges.append(((from_GPM, from_freq), (to_GPM, to_freq)))
        GPM_ID_frequency_count.update([from_GPM])
    
    # Get the templates of GPM actions
    used_gpm_ids = set([row[0][0] for row in GPM_edges] + [row[1][0] for row in GPM_edges])
    gpm_templates = create_GPM_templates_with_counter(df_gpm, GPM_edges)

    # Get the templates of GPM edges
    gpm_edges_template = create_GPM_diagram_edges_with_counter(GPM_edges)

    # Combine the templates and edges
    return switch_lld_to_gpm(gpm_templates, gpm_edges_template)

In [32]:
def files_in_dir(directory_path):
    files_and_dirs = os.listdir(directory_path)
    print(files_and_dirs)

def get_current_datetime_components():
    """Returns the current year, month, day, hour, and minute as formatted strings."""
    now = datetime.datetime.now()
    return f"{now.strftime("%Y")}-{now.strftime("%m")}-{now.strftime("%d")}-{now.strftime("%H")}-{now.strftime("%M")}"

def create_mermaid_diagram(df_pd3, df_GPM, LLD_IDs = "Action ID", LLD_Inputs="Input", LLD_Names="Action", LLD_Outputs="Output", GPM_IDs="ClassID", GPM_Inputs="ClassInput", GPM_Names="ClassName", GPM_Outputs="ClassOutput", LLD_GPM_Edges="ClassID"):
    """
    Args:
        df (pandas.DataFrame): DataFrame containing the class hierarchy data.
    Returns:
        str: Mermaid diagram code with Input, Class name and Output, representing the class hierarchy.
    """
    LLD_actions = create_LLD_templates(df_pd3[LLD_IDs].tolist(), df_pd3[LLD_Inputs].tolist(), df_pd3[LLD_Names].tolist(), df_pd3[LLD_Outputs].tolist())
    GPM_actions = create_GPM_templates(df_GPM[GPM_IDs].tolist(), df_GPM[GPM_Inputs].tolist(), df_GPM[GPM_Names].tolist(), df_GPM[GPM_Outputs].tolist())
    LLD_edges = create_LLD_edges(df_pd3[LLD_IDs].tolist())
    LLD_GPM_edges = create_LLD_GPM_edges(df_pd3[LLD_IDs].tolist(), df_pd3[LLD_GPM_Edges].tolist())
    pd3_template = create_pd3_template(LLD_actions, GPM_actions, LLD_edges, LLD_GPM_edges)
    return pd3_template

def create_GPM_containers(df_GPM, GPM_IDs="ClassID", GPM_Inputs="ClassInput", GPM_Names="ClassName", GPM_Outputs="ClassOutput", GPM_Parents="PartOf"):
    """
    Args:
        df (pandas.DataFrame): DataFrame containing the class hierarchy data.
    Returns:
        str: Mermaid diagram code with Input, Class name and Output, representing the class hierarchy.
    """
    GPM_actions = create_GPM_templates(df_GPM[GPM_IDs].tolist(), df_GPM[GPM_Inputs].tolist(), df_GPM[GPM_Names].tolist(), df_GPM[GPM_Outputs].tolist())
    GPM_edges = create_GPM_edges(df_GPM[GPM_IDs].tolist(), df_GPM[GPM_Parents].tolist())
    GPM_containers = create_GPM_partof_diagram(GPM_actions, GPM_edges)
    return GPM_containers

In [33]:
files_in_dir("./target/")

['ma-welding-gpmreconstruction-2026-01-09-17-03.xlsx', 'ma-welding-lldgrouping-2026-01-09-17-03.xlsx', 'merged_lld_gpm-2026-01-09-17-58.xlsx']


In [40]:
df_GPM = pd.read_excel("./target/ma-welding-gpmreconstruction-2026-01-09-17-03.xlsx")
df_pd3 = pd.read_excel("./target/ma-welding-lldgrouping-2026-01-09-17-03.xlsx")
print(df_GPM.head(1))
df_each_log = df_pd3.groupby("Log", as_index=False)
df_each_log.head(1)

   ClassID              ClassInput       ClassName       ClassOutput  \
0        1  不具合が疑われる対象（部品/Assy/工程）  不具合改善プロセスを統括する  改善完了の判断・標準化された対策   

                                      ClassIntent  \
0  溶接工程における部品精度/組付精度の不具合を、現状把握→原因特定→対策→検証の流れで解消する   

                             ClassConstructionReason  PartOf PartOfReason  
0  Sourcesの工程ルール『不具合を観察…→原因推定と仮説検証→…→対策案の立案と効果検証』...       0   最上位プロセスのため  


Unnamed: 0,Log,Action ID,Input,Action,Output,Intention,Rationale,Annotation,Tools,Engineering knowledge,ClassID,ClassificationReason
0,1,1,治具に固定された300Dフロントピラー,300dフロントピラーの三次元形状を測定する,300Dフロントピラーの三次元形状データ；300DフロントピラーのCADデータ,組み付け精度を確認するために、部品に不具合がないかを知りたい,no rationale,no annotation,三次元形状測定機,no engineering knowledge,4,「三次元形状を測定する」は現状把握のための計測（Sources: Engineering p...
36,2,37,治具にセット済みの300Dトルーフ,目視や手触りによる外観チェックで300Dトルーフの不具合を観察する,300Dトルーフの１打点目がパネルズレがあり、\n打点すると位置ズレが起こる,どんな不具合があるのかを知りたい,no rationale,no annotation,Tool\n目視、手触り,知識：\n・外観チェックで隙間や位置ズレなどの目立ている不具合の確認ができる\n・隙間は1m...,3,「目視や手触りで観察」は不具合観察（現状把握）。Sources: 工程ルール『不具合を観察』。
72,3,73,セット済みの120D の部品,部品の三次元形状を測定する,120D の三次元形状データ,部品の形状に異常ないかを知りたい,no rationale,no annotation,三次元測定機,no engineering knowledge,4,「三次元形状を測定する」は現状把握の計測（120D）。Sources: Keyword『12...
101,4,102,no input,単品の32Dセンターフロアの精度を測定する,測定結果：\n１：単品の①②③面精度が悪い\n２：基準Ⅰ、Ⅱの穴径精度が悪い,部品に不具合がないかを知りたい,no rationale,no annotation,no tools,Knowledge：\n知識：\n・③の面位置が公差土1.5に対して・1.6～-1.8と公差...,4,「32Dセンターフロアの精度を測定」は現状把握の計測。Sources: Keyword『32...
122,5,123,no input,部品460B ラダーAssyに不具合がないかを調べる,ロッカーインナRr×クロスNo1\n\nW方向外開き、縦面隙あり,部品に不具合がないかを知りたい,no rationale,no annotation,Tool：\n目視,no engineering knowledge,3,「不具合がないか調べる（目視）」は観察による現状把握。Sources: 工程ルール『不具合を...


In [11]:
for group in df_each_log.groups.keys():
    graph = create_mermaid_diagram(df_each_log.get_group(group), df_GPM)
    with open(f"./outputs/graph_{group}-{get_current_datetime_components()}.mmd", "w") as f:
        f.write(graph)
    print(f"Log: {group} => The PD3 has been created.")

Log: 1 => The PD3 has been created.
Log: 2 => The PD3 has been created.
Log: 3 => The PD3 has been created.
Log: 4 => The PD3 has been created.
Log: 5 => The PD3 has been created.


In [12]:
containers = create_GPM_containers(df_GPM)
with open(f"./outputs/GPM_containers-{get_current_datetime_components()}.mmd", "w") as f:
    f.write(containers)

In [13]:
gpm_lld_references_diagram = GPM_LLD_references(df_pd3, df_GPM)
with open(f"./outputs/gpm_references-{get_current_datetime_components()}.mmd", "w") as f:
    f.write(gpm_lld_references_diagram)

In [41]:
for group in df_each_log.groups.keys():
    gpm_diagram_for_group = create_GPM_diagram(df_each_log.get_group(group), df_GPM)
    with open(f"./outputs/GPM_Flow_{group}-{get_current_datetime_components()}.mmd", "w") as f:
        f.write(gpm_diagram_for_group)
    print(f"Log: {group} => The PD3 has been created.")

Log: 1 => The PD3 has been created.
Log: 2 => The PD3 has been created.
Log: 3 => The PD3 has been created.
Log: 4 => The PD3 has been created.
Log: 5 => The PD3 has been created.
