In [1]:
from pyiron_workflow import Workflow
from PIL import Image
from pathlib import Path
import matplotlib.pylab as plt
import numpy as np
import os
import shutil
import base64
import io
import pylnk3
import re
import subprocess
import uuid
import time

In [None]:
### sub_function_node: decide each time, overwrite or create a new folder.

## every time a new directory, not overwrite
@Workflow.wrap.as_function_node
def generate_working_directory_keep(path, directory_name):
    base = Path(path)
    base.mkdir(parents=True, exist_ok=True)

    # find the biggest x in existed "Python_marco_to_inp_x" 
    existing = [d for d in base.iterdir() if d.is_dir() and d.name.startswith(directory_name)]
    if existing:
        numbers = [int(d.name.rsplit("_", 1)[1]) for d in existing if "_" in d.name]
        next_num = max(numbers) + 1
    else:
        next_num = 1
    working_directory = base / f"{directory_name}_{next_num}"
    working_directory.mkdir()
    return working_directory


## every time overwrite the directory
@Workflow.wrap.as_function_node
def generate_working_directory_overwrite(path, directory_name):
    working_directory = Path(path) / directory_name
    print(f"[INFO] Working directory: {working_directory}")

    # 
    if not hasattr(generate_working_directory_overwrite, "_already_initialized"):
        if working_directory.exists():
            print("[INFO] (First run) Directory exists — deleting it and recreating.")
            shutil.rmtree(working_directory)  
        working_directory.mkdir(parents=True, exist_ok=True)
        generate_working_directory_overwrite._already_initialized = True
        print("[INFO] (First run) Directory ready.")
    else:
        # 
        if not working_directory.exists():
            print("[INFO] (Later run) Directory missing — creating it again.")
            working_directory.mkdir(parents=True, exist_ok=True)
        else:
            print("[INFO] (Later run) Directory already exists — keep using it (no delete).")

    return working_directory

In [3]:
### Marco_node: Abaqus marco script to inp 

@Workflow.wrap.as_function_node
def write_abaqus_input(script_path, working_directory):
    input_script = os.path.basename(script_path)   # Script name only (no path)
    local_input_script = os.path.join(working_directory, input_script)   # Spell out the target path in the working directory
    shutil.copy(script_path, local_input_script)
    return local_input_script


@Workflow.wrap.as_function_node
def run_in_abaqus(executable, script_path, working_directory):
    ## Construct and run Abaqus on the command line
    cmd = f'"{executable}" cae noGUI={script_path}'
    subprocess.check_call(cmd, cwd=working_directory, shell=True)   ## Run the command in the working directory
    
    # print(working_directory)
    files = os.listdir(working_directory)
    # print(files)
    
    while True:
        files = os.listdir(working_directory) # "C:/Local_Dong/Projekt/AnAttAl/CAD/AnAttAl_CAD_OnlyWorkflow_demo/Python_marco_to_inp")
        # print(files)
        cae_files = [f for f in files if f.lower().endswith('.cae')]
        inp_files = [f for f in files if f.lower().endswith('.inp')]
        if cae_files and inp_files:
            print(f"Finish, all files： {cae_files}, {inp_files}\n")
            break
    # if time.time() - start_time > timeout:
    #     raise TimeoutError("check the code")    
    time.sleep(2)
        
    cae_path = os.path.join(working_directory, cae_files[0])
    inp_path = os.path.join(working_directory, inp_files[0]) 
    
    return {"inp_path":inp_path,
            "cae_path":cae_path}

# @Workflow.wrap.as_function_node
# def collect_output(working_directory):
#     print(working_directory)
#     # start_time = time.time()
#     # timeout = 30
#     files = os.listdir(working_directory)
#     print(files)
#     while True:
#         files = os.listdir(working_directory) # "C:/Local_Dong/Projekt/AnAttAl/CAD/AnAttAl_CAD_OnlyWorkflow_demo/Python_marco_to_inp")
#         print(files)
#         cae_files = [f for f in files if f.lower().endswith('.cae')]
#         inp_files = [f for f in files if f.lower().endswith('.inp')]
#         if cae_files and inp_files:
#             print(f"Finish, all files： {cae_files}, {inp_files}\n")
#             break
#     # if time.time() - start_time > timeout:
#     #     raise TimeoutError("check the code")    
#     time.sleep(2)
        
#     cae_path = os.path.join(working_directory, cae_files[0])
#     inp_path = os.path.join(working_directory, inp_files[0]) 
    
#     return {"cae_path":cae_path, 
#             "inp_path":inp_path}
    ## Since it is a job object, saved in HDF5, it can be called directly without return


# @Workflow.wrap.as_function_node
# def generate_working_directory(path):
#     working_directory = Path(path) / "Python_marco_to_inp"
#     print(f"[INFO] Working directory: {working_directory}")

#     return working_directory


### Run Marco_node
@Workflow.wrap.as_macro_node("inp_path", "cae_path")
def export_inp_cae(macro, script_path, base_path, directory_name, executable=r"C:\SIMULIA\Commands\abq2018.bat"):
    # path = macro.as_path() # This gives you the path based on the workflow name
    path = base_path
    macro.work_dir = generate_working_directory_overwrite(path, directory_name)
    # macro.working_directory = generate_working_directory_keep(path)
    macro.scripts = write_abaqus_input(script_path=script_path, 
                                       working_directory=macro.work_dir)
    macro.inp_output = run_in_abaqus(executable=executable, 
                                     script_path=macro.scripts, 
                                     working_directory=macro.work_dir)
    # macro.check = collect_output(working_directory=macro.work_dir)

    # This is in principle not required, but it makes sure that the nodes are executed in the given order
    # macro.work_dir >> macro.scripts >> macro.inp_output >> macro.
    
    return (macro.inp_output["inp_path"],
            macro.inp_output["cae_path"])

In [4]:
### Marco_node: processing inp 

@Workflow.wrap.as_function_node
def write_inp_input(script_path, inp_path, working_directory):
        
    ## Extract the file name and copy it to the current project directory.
    # copy python script
    input_script = os.path.basename(script_path)
    local_script_path = os.path.join(working_directory, input_script)
    shutil.copy(script_path, local_script_path)

    # copy .inp file
    input_path = os.path.basename(inp_path)
    local_inp_path = os.path.join(working_directory, input_path)
    shutil.copy(inp_path, local_inp_path)

    return {"script_path":local_script_path, 
            "inp_path":local_inp_path}


@Workflow.wrap.as_function_node
def run_in_python(executable, script_path, inp_path, working_directory):
    ## Constructor commands: python script Input file
    cmd = [str(executable), script_path, inp_path]
    subprocess.check_call(cmd, cwd=working_directory)

    all_txt_files = [f for f in os.listdir(working_directory) if f.lower().endswith(".txt")]
    all_txt_files_path = [os.path.abspath(os.path.join(working_directory, f)) for f in os.listdir(working_directory) if f.endswith(".txt")]
    print(f"finish, all files：{all_txt_files}\n")
    
    return {"txt_paths": all_txt_files_path} 


### Run Marco_node
@Workflow.wrap.as_macro_node("txt_paths")
def processing_inp(macro, script_path, inp_path, base_path, directory_name, executable=r"python"):
    # path = macro.as_path() # This gives you the path based on the workflow name
    path = base_path
    macro.work_dir = generate_working_directory_overwrite(path, directory_name)
    # macro.working_directory = generate_working_directory_keep(path)
    macro.scripts = write_inp_input(script_path=script_path, 
                                    inp_path=inp_path, 
                                    working_directory=macro.work_dir)
    macro.processing_inp = run_in_python(executable=executable, 
                                         script_path=macro.scripts['script_path'], 
                                         inp_path=macro.scripts['inp_path'],
                                         working_directory=macro.work_dir)
    # macro.check = collect_output(working_directory=macro.work_dir)

    return macro.processing_inp["txt_paths"]

In [5]:
### Marco_node: activate fortran compiler und get .exe

@Workflow.wrap.as_function_node
def write_compiler_input(compiler_path, fortran_file_path, working_directory):
    compiler_path = os.path.abspath(compiler_path)
    fortran_file_path = os.path.abspath(fortran_file_path)

    input_fortran_file = os.path.basename(fortran_file_path)   # Script name only (no path)
    local_fortran_file_path = os.path.join(working_directory, input_fortran_file)   # Spell out the target path in the working directory
    shutil.copy(fortran_file_path, local_fortran_file_path)

    return {"compiler_path":compiler_path, 
            "fortran_path":local_fortran_file_path}
    
@Workflow.wrap.as_function_node
def run_fortran_in_compiler(compiler_path, local_fortran_file_path, working_directory):
    ## Parsing .lnk shortcuts (Windows)
    with open(compiler_path, 'rb') as f:
        lnk = pylnk3.parse(f)

    raw_path = lnk.path.strip('"')    # Executable path to which the shortcut points
    raw_args = lnk.arguments.strip()    # Shortcut parameters

    ## If .lnk points to cmd.exe, it is necessary to extract the .bat path from the parameter
    if raw_path.lower().endswith("cmd.exe"):
        match = re.search(r'"(?P<bat>C:.*?\.bat)"\s*(?P<args>[^"]*)', raw_args, re.IGNORECASE)
        if not match:
            raise ValueError(f"Unable to extract .bat file path from .lnk arguments with contents：{raw_args}")
        env_bat_path = match.group("bat")
        extra_args = match.group("args")
    else:
        env_bat_path = raw_path
        extra_args = raw_args

    ## Constructing the ifort compilation command
    # exe_name = self.output_exe or os.path.splitext(os.path.basename(self.local_fortran))[0] + ".exe"
    # exe_name = os.path.normpath(exe_name)
    fortran_path = os.path.normpath(local_fortran_file_path)
    compile_cmd = f'ifort "{fortran_path}"'

    setup_cmd = f'call "{env_bat_path}" {extra_args} && {compile_cmd}'

    # print(setup_cmd)

    try:
        result = subprocess.run(
            f'cmd /c "{setup_cmd}"',
            cwd=working_directory,
            shell=True,
            check=True,
            text=True,
            capture_output=True
        )
        print("compiler output:\n", result.stdout)
        if result.stderr:
            print("warning:\n", result.stderr)
    except subprocess.CalledProcessError as e:
        print("error")
        print("stdout:\n", e.stdout)
        print("stderr:\n", e.stderr)
        raise

    exe_files = [f for f in os.listdir(working_directory) if f.lower().endswith(".exe")]
    if not exe_files:
        raise FileNotFoundError(".exe file not found in the working directory")

    exe_filename = exe_files[0]
    exe_path = os.path.abspath(os.path.join(working_directory, exe_filename))
    
    print(f"Finish, generated file: ['{exe_filename}']\n")
    
    return {"fortran_exe_filename": exe_filename,
            "exe_path": exe_path}


### Run Marco_node
@Workflow.wrap.as_macro_node("fortran_exe_filename", "exe_path")
def fortran_to_exe(macro, compiler_path, fortran_file_path, base_path, directory_name):
    # path = macro.as_path() # This gives you the path based on the workflow name
    path = base_path
    macro.work_dir = generate_working_directory_overwrite(path, directory_name)
    # macro.working_directory = generate_working_directory_keep(path)
    macro.fortran_file = write_compiler_input(compiler_path=compiler_path, 
                                              fortran_file_path=fortran_file_path, 
                                              working_directory=macro.work_dir)
    macro.fortran_to_exe = run_fortran_in_compiler(compiler_path=macro.fortran_file["compiler_path"], 
                                                   local_fortran_file_path=macro.fortran_file["fortran_path"],
                                                   working_directory=macro.work_dir)

    return (macro.fortran_to_exe["fortran_exe_filename"],
            macro.fortran_to_exe["exe_path"])

In [6]:
### Marco_node: running .exe in compiler

@Workflow.wrap.as_function_node
def write_running_exe_input(compiler_path, exe_path, working_directory, input_txt_paths_von_FAMOS=[], input_txt_paths_von_inp=[]):
    compiler_path = os.path.abspath(compiler_path)
    exe_path = os.path.abspath(exe_path)
    
    # Copy .exe to working directory
    exe_filename = os.path.basename(exe_path)   # Script name only (no path)
    local_exe_path = os.path.join(working_directory, exe_filename)   # Spell out the target path in the working directory
    shutil.copy(exe_path, local_exe_path)

    # Copy all input txt files into working directory
    local_txt_files = []
    for txt_path in input_txt_paths_von_FAMOS + input_txt_paths_von_inp:
        txt_name = os.path.basename(txt_path)
        dst_path = os.path.join(working_directory, txt_name)
        txt_path = os.path.abspath(txt_path)
        shutil.copy(txt_path, dst_path)
        local_txt_files.append(dst_path)
        
    print(local_txt_files)
    
    return {"compiler_path": compiler_path,
            "exe_filename": exe_filename,
            "input_txt_set": local_txt_files}


@Workflow.wrap.as_function_node
def run_exe(compiler_path, exe_filename, working_directory):
    with open(compiler_path, 'rb') as f:
        lnk = pylnk3.parse(f)

    raw_path = lnk.path.strip('"')
    raw_args = lnk.arguments.strip()

    if raw_path.lower().endswith("cmd.exe"):
        match = re.search(r'"(?P<bat>C:.*?\.bat)"\s*(?P<args>[^"]*)', raw_args, re.IGNORECASE)
        if not match:
            raise ValueError(f"Unable to extract .bat file path from .lnk arguments with contents：{raw_args}")
        env_bat_path = match.group("bat")
        extra_args = match.group("args").strip('"')
    else:
        env_bat_path = raw_path
        extra_args = raw_args
        
    ## First call the compiler environment variable
    ## Then cd to the working directory
    ## Finally, execute the .exe file
    run_cmd = f'call "{env_bat_path}" {extra_args} && cd /d "{working_directory}" && "{exe_filename}"'

    # print(f"\n{run_cmd}\n")

    try:
        result = subprocess.run(
            f'cmd /c "{run_cmd}"',
            cwd=working_directory,
            shell=True,
            check=True,
            capture_output=True,
            text=True
        )
        print(result.stdout)
        print(result.stderr)
        
    except subprocess.CalledProcessError as e:
        print("error：")
        print("STDOUT:\n", e.stdout)
        print("STDERR:\n", e.stderr)
        raise

    outputs = [f for f in os.listdir(working_directory) if f.endswith('.odb')]
    print(f"Finish, generated files: {outputs}\n")
    
    return {"output_files": [os.path.join(self.working_directory, f) for f in outputs]}


### Run Marco_node
@Workflow.wrap.as_macro_node("output_files")
def running_exe_in_compiler(macro, compiler_path, exe_path, base_path, directory_name,
                            input_txt_paths_von_FAMOS, input_txt_paths_von_inp=[]):
    # path = macro.as_path() # This gives you the path based on the workflow name
    path = base_path
    macro.work_dir = generate_working_directory_overwrite(path, directory_name)
    # macro.working_directory = generate_working_directory_keep(path)
    macro.write_exe_input = write_running_exe_input(compiler_path=compiler_path, 
                                                    exe_path=exe_path, 
                                                    working_directory=macro.work_dir,
                                                    input_txt_paths_von_FAMOS=input_txt_paths_von_FAMOS,
                                                    input_txt_paths_von_inp=input_txt_paths_von_inp)
    macro.fortran_to_exe = run_exe(compiler_path=macro.write_exe_input["compiler_path"], 
                                   exe_filename=macro.write_exe_input["exe_filename"],
                                   working_directory=macro.work_dir)

    return (macro.fortran_to_exe["output_files"])

In [129]:
if __name__ == '__main__':
    wf = Workflow("Simulation_Workflow")
    
    project_path = r"C:\Local_Dong\Projekt\AnAttAl\CAD\AnAttAl_CAD_OnlyWorkflow_demo"
    abaqus_python_script = r"C:\Local_Dong\Projekt\AnAttAl\CAD\abaqusMacros.py"
    processing_inp_python_script = r"C:\Local_Dong\Projekt\AnAttAl\CAD\File_Programm3_mK.py"
    fortran_compiler_path = r"C:\Local_Dong\Projekt\AnAttAl\CAD\Compiler 16.0 Update 1 for Intel 64 Visual Studio 2015 environment.lnk"
    fortran_file = r"C:\Local_Dong\Projekt\AnAttAl\CAD\Testfall_1ms_modKInc_Dyn5_oheneingeben.for"

    wf.inp_data = export_inp_cae(script_path=abaqus_python_script, 
                                 base_path=project_path,
                                 directory_name="Python_marco_to_inp")
    
    wf.processing_inp = processing_inp(script_path=processing_inp_python_script, 
                                       inp_path=wf.inp_data.outputs["inp_path"], 
                                       base_path=project_path,
                                       directory_name="Python_marco_processing_inp")
    
    wf.fortran_to_exe = fortran_to_exe(compiler_path=fortran_compiler_path, 
                                       fortran_file_path=fortran_file, 
                                       base_path=project_path, 
                                       directory_name="Fortran_to_exe")
    
    input_txts_FAMOS = [r"C:\Local_Dong\Projekt\AnAttAl\CAD\Stromverlauf.txt",
                        r"C:\Local_Dong\Projekt\AnAttAl\CAD\Kraftverlauf.txt",
                        r"C:\Local_Dong\Projekt\AnAttAl\CAD\Potentialverlauf.txt"]
    
    wf.running_exe_in_compiler = running_exe_in_compiler(compiler_path=fortran_compiler_path,
                                                         exe_path=wf.fortran_to_exe.outputs["exe_path"],
                                                         base_path=project_path,
                                                         directory_name="running_exe_in_compiler",
                                                         input_txt_paths_von_FAMOS=input_txts_FAMOS,
                                                         input_txt_paths_von_inp=wf.processing_inp.outputs["txt_paths"])
    
    wf.run()

    # print(wf.inp_data.outputs["inp_path"])

[INFO] Working directory: C:\Local_Dong\Projekt\AnAttAl\CAD\AnAttAl_CAD_OnlyWorkflow_demo\Fortran_to_exe
[INFO] (Later run) Directory already exists — keep using it (no delete).
compiler output:
 
Intel(R) MPI Library 5.1 Update 2 for Windows* Target Build Environment for Intel(R) 64 applications
Copyright (C) 2007-2015 Intel Corporation. All rights reserved.

Copyright (C) 1985-2015 Intel Corporation. All rights reserved.
Intel(R) Compiler 16.0 Update 1 (package 146)
 

Microsoft (R) Incremental Linker Version 14.00.24215.1
Copyright (C) Microsoft Corporation.  All rights reserved.

-out:Testfall_1ms_modKInc_Dyn5_oheneingeben.exe 
-subsystem:console 
Testfall_1ms_modKInc_Dyn5_oheneingeben.obj 

 Intel(R) Visual Fortran Intel(R) 64 Compiler for applications running on Intel(R) 64, Version 16.0.1.146 Build 20151021
Copyright (C) 1985-2015 Intel Corporation.  All rights reserved.


Finish, generated file: ['Testfall_1ms_modKInc_Dyn5_oheneingeben.exe']

[INFO] Working directory: C:\Local_

FailedChildError: /Simulation_Workflow encountered error in child: {'/Simulation_Workflow/running_exe_in_compiler.accumulate_and_run': FailedChildError('/Simulation_Workflow/running_exe_in_compiler encountered error in child: {\'/Simulation_Workflow/running_exe_in_compiler/fortran_to_exe.accumulate_and_run\': CalledProcessError(24, \'cmd /c "call "C:\\\\Program Files (x86)\\\\IntelSWTools\\\\compilers_and_libraries_2016.1.146\\\\windows\\\\bin\\\\ipsxe-comp-vars.bat" intel64 vs2015 && cd /d "C:\\\\Local_Dong\\\\Projekt\\\\AnAttAl\\\\CAD\\\\AnAttAl_CAD_OnlyWorkflow_demo\\\\running_exe_in_compiler" && "Testfall_1ms_modKInc_Dyn5_oheneingeben.exe""\')}')}