From 4fbf64f6af6da05a87e77b257695cd3000046196 Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Wed, 12 Feb 2025 16:40:57 -1000 Subject: [PATCH 01/15] Added airflow translator (WIP). --- wfcommons/wfbench/__init__.py | 2 +- wfcommons/wfbench/translator/__init__.py | 1 + wfcommons/wfbench/translator/airflow.py | 93 ++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 wfcommons/wfbench/translator/airflow.py diff --git a/wfcommons/wfbench/__init__.py b/wfcommons/wfbench/__init__.py index bf670d71..c376b0c8 100644 --- a/wfcommons/wfbench/__init__.py +++ b/wfcommons/wfbench/__init__.py @@ -9,4 +9,4 @@ # (at your option) any later version. from .bench import WorkflowBenchmark -from .translator import DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator +from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator diff --git a/wfcommons/wfbench/translator/__init__.py b/wfcommons/wfbench/translator/__init__.py index f5af2d5f..00886f69 100644 --- a/wfcommons/wfbench/translator/__init__.py +++ b/wfcommons/wfbench/translator/__init__.py @@ -8,6 +8,7 @@ # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. +from .airflow import AirflowTranslator from .dask import DaskTranslator from .nextflow import NextflowTranslator from .parsl import ParslTranslator diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py new file mode 100644 index 00000000..5c3216c2 --- /dev/null +++ b/wfcommons/wfbench/translator/airflow.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2021-2024 The WfCommons Team. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +import pathlib +import re + +from logging import Logger +from typing import Optional, Union + +from .abstract_translator import Translator +from ...common import Workflow + +class AirflowTranslator(Translator): + """ + A WfFormat parser for creating Nextflow workflow applications. + + :param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance. + :type workflow: Union[Workflow, pathlib.Path], + :param logger: The logger where to log information/warning or errors (optional). + :type logger: Logger + """ + + def __init__(self, + workflow: Union[Workflow, pathlib.Path], + logger: Optional[Logger] = None, + input_file_directory: pathlib.Path = pathlib.Path("/")) -> None: + """Create an object of the translator.""" + super().__init__(workflow, logger) + + self.input_file_directory = input_file_directory + self._prep_commands() + + self.script = f""" +from __future__ import annotations + +from datetime import datetime +from airflow.models.dag import DAG +from airflow.operators.bash import BashOperator + +with DAG( + "{self.workflow.name}_wfcommons", + description="airflow translation of a wfcommons instance", + schedule="0 0 * * *", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["wfcommons"], +) as dag: +""" + + def translate(self, output_file_path: pathlib.Path) -> None: + """ + Translate a workflow benchmark description(WfFormat) into a Airflow workflow application. + + : param output_file_path: The name of the output file(e.g., workflow.py). + : type output_file_path: pathlib.Path + """ + + for task in self.tasks: + self.script += f""" + {task} = BashOperator( + task_id="{task}", + depends_on_past=False, + bash_command="{self.task_commands[task]}", + retries=3, + ) +""" + for task in self.tasks: + parents = ", ".join(self.task_parents[task]) + if parents: + self.script += f""" + [{parents}] >> {task} +""" + + self._write_output_file(self.script, output_file_path) + + def _prep_commands(self): + self.task_commands = {} + for task in self.workflow.workflow_json["workflow"]["execution"]["tasks"]: + command_str = " ".join([task["command"]["program"]] + task["command"]["arguments"]) + # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} + command_str = re.sub(r"(\{|\})", r"\"\1", command_str) + # Prepends txt filenames with absolute path + command_str = re.sub(r"([\w\-]+\.txt)", + lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", + command_str) + self.task_commands[task["id"]] = command_str \ No newline at end of file From b371be34eef2ffae814b8807e148167474881b0f Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Wed, 19 Feb 2025 16:22:58 -1000 Subject: [PATCH 02/15] Added airflow dockfile (WIP). --- tests/translators/Dockerfile_Airflow | 59 ++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/translators/Dockerfile_Airflow diff --git a/tests/translators/Dockerfile_Airflow b/tests/translators/Dockerfile_Airflow new file mode 100644 index 00000000..868e513a --- /dev/null +++ b/tests/translators/Dockerfile_Airflow @@ -0,0 +1,59 @@ +# docker build -t wfcommons-dev -f Dockerfile_Airflow . +# docker run -it --rm -v .:/home/wfcommons wfcommons-dev /bin/bash + +FROM ubuntu:jammy + +LABEL org.containers.image.authors="sukaryor@hawaii.edu" + +# update repositories +RUN apt-get update + +# set timezone +RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata + +# install useful stuff +RUN apt-get -y install pkg-config +RUN apt-get -y install git +RUN apt-get -y install wget +RUN apt-get -y install make +RUN apt-get -y install cmake +RUN apt-get -y install cmake-data +RUN apt-get -y install sudo +RUN apt-get -y install vim --fix-missing +RUN apt-get -y install gcc +RUN apt-get -y install gcc-multilib + +# Python stuff +RUN apt-get -y install python3 python3-pip +RUN python3 -m pip install pathos pandas filelock +RUN python3 -m pip install networkx scipy matplotlib +RUN python3 -m pip install pyyaml jsonschema requests +RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 + +# Stress-ng +RUN apt-get -y install stress-ng + +# Install Airflow +RUN python3 -m pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" + +# Install MySQL/MyClient +RUN apt-get install -y mysql-server +RUN apt-get install -y python3-dev build-essential +RUN apt-get install -y default-libmysqlclient-dev +RUN python3 -m pip install mysqlclient + +# Setup database for Airflow +RUN mysqld --explicit-defaults-for-timestamp +RUN mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"; +RUN mysql -u root -e "CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass'"; +RUN mysql -u root -e "GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'"; + +# Add wfcommons user +RUN useradd -ms /bin/bash wfcommons +RUN adduser wfcommons sudo +RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers +ENV PATH="$PATH:/home/wfcommons/.local/bin/" + +USER wfcommons +WORKDIR /home/wfcommons + From 7c3ba0ffea06bd98a09d1ea17cd8fcdd3c935c5d Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 19 Feb 2025 17:16:41 -1000 Subject: [PATCH 03/15] Fixed Airflow dockerfile --- tests/translators/Dockerfile_Airflow | 42 ++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/tests/translators/Dockerfile_Airflow b/tests/translators/Dockerfile_Airflow index 868e513a..a4316dfc 100644 --- a/tests/translators/Dockerfile_Airflow +++ b/tests/translators/Dockerfile_Airflow @@ -1,7 +1,7 @@ # docker build -t wfcommons-dev -f Dockerfile_Airflow . # docker run -it --rm -v .:/home/wfcommons wfcommons-dev /bin/bash -FROM ubuntu:jammy +FROM ubuntu:noble LABEL org.containers.image.authors="sukaryor@hawaii.edu" @@ -21,32 +21,51 @@ RUN apt-get -y install cmake-data RUN apt-get -y install sudo RUN apt-get -y install vim --fix-missing RUN apt-get -y install gcc -RUN apt-get -y install gcc-multilib +#RUN apt-get -y install gcc-multilib # Python stuff RUN apt-get -y install python3 python3-pip -RUN python3 -m pip install pathos pandas filelock -RUN python3 -m pip install networkx scipy matplotlib -RUN python3 -m pip install pyyaml jsonschema requests +RUN python3 -m pip install --break-system-packages pathos pandas filelock +RUN python3 -m pip install --break-system-packages networkx scipy matplotlib +RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 # Stress-ng RUN apt-get -y install stress-ng # Install Airflow -RUN python3 -m pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" +RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" # Install MySQL/MyClient RUN apt-get install -y mysql-server RUN apt-get install -y python3-dev build-essential RUN apt-get install -y default-libmysqlclient-dev -RUN python3 -m pip install mysqlclient +RUN python3 -m pip install --break-system-packages mysqlclient + +# Create an entrypoint script to start mysqld in the background +# and setup the Airflow DB +RUN echo '#!/bin/bash' > /entrypoint.sh && \ + echo 'sudo mysqld --explicit-defaults-for-timestamp &' >> /entrypoint.sh && \ + echo 'until sudo mysqladmin ping -h localhost --silent; do' >> /entrypoint.sh && \ + echo ' echo "Waiting for MySQL to be ready..."' >> /entrypoint.sh && \ + echo ' sleep 2' >> /entrypoint.sh && \ + echo 'done' >> /entrypoint.sh && \ + echo 'echo "MySQL is ready!"' >> /entrypoint.sh && \ + echo 'echo "Setting up database for Airflow..."' >> /entrypoint.sh && \ + echo 'sudo mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \ + echo "sudo mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \ + echo "sudo mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \ + echo "airflow config list 1> /dev/null" >> /entrypoint.sh && \ + echo "sed -i ./airflow/airflow.cfg -e 's/sqlite:.*/mysql+mysqldb:\/\/airflow_user:airflow_pass@localhost:3306\/airflow_db/'" >> /entrypoint.sh && \ + echo 'echo "Airflow database setup!"' >> /entrypoint.sh && \ + echo 'exec bash' >> /entrypoint.sh && \ + chmod +x /entrypoint.sh # Setup database for Airflow -RUN mysqld --explicit-defaults-for-timestamp -RUN mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"; -RUN mysql -u root -e "CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass'"; -RUN mysql -u root -e "GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'"; +#RUN mysqld --explicit-defaults-for-timestamp +#RUN mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"; +#RUN mysql -u root -e "CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass'"; +#RUN mysql -u root -e "GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'"; # Add wfcommons user RUN useradd -ms /bin/bash wfcommons @@ -57,3 +76,4 @@ ENV PATH="$PATH:/home/wfcommons/.local/bin/" USER wfcommons WORKDIR /home/wfcommons +ENTRYPOINT ["/entrypoint.sh"] From 6ff0533b8516c0cc8748efd16dd4126860a1b2d3 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 12 Mar 2025 16:40:48 -1000 Subject: [PATCH 04/15] Upgraded Airflow translater to new WfBench command-line arguments --- wfcommons/wfbench/translator/airflow.py | 85 ++++++++++++++++++------- 1 file changed, 63 insertions(+), 22 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 5c3216c2..447170bf 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -10,6 +10,8 @@ import pathlib import re +import ast +import json from logging import Logger from typing import Optional, Union @@ -30,13 +32,13 @@ class AirflowTranslator(Translator): def __init__(self, workflow: Union[Workflow, pathlib.Path], logger: Optional[Logger] = None, - input_file_directory: pathlib.Path = pathlib.Path("/")) -> None: + # input_file_directory: pathlib.Path = pathlib.Path("/") + ) -> None: """Create an object of the translator.""" super().__init__(workflow, logger) - self.input_file_directory = input_file_directory - self._prep_commands() - + # self.input_file_directory = input_file_directory + # self.script = f""" from __future__ import annotations @@ -54,40 +56,79 @@ def __init__(self, ) as dag: """ - def translate(self, output_file_path: pathlib.Path) -> None: + def translate(self, output_folder: pathlib.Path) -> None: """ Translate a workflow benchmark description(WfFormat) into a Airflow workflow application. - : param output_file_path: The name of the output file(e.g., workflow.py). - : type output_file_path: pathlib.Path + : param output_folder: The name of the output folder. + : type output_folder: pathlib.Path """ - for task in self.tasks: + self._prep_commands(output_folder) + + for task in self.tasks.values(): self.script += f""" - {task} = BashOperator( - task_id="{task}", + {task.task_id} = BashOperator( + task_id="{task.task_id}", depends_on_past=False, - bash_command="{self.task_commands[task]}", + bash_command="{self.task_commands[task.task_id]}", retries=3, ) """ - for task in self.tasks: - parents = ", ".join(self.task_parents[task]) + for task in self.tasks.values(): + parents = ", ".join(self.task_parents[task.task_id]) if parents: self.script += f""" - [{parents}] >> {task} + [{parents}] >> {task.task_id} """ + # write benchmark files + output_folder.mkdir(parents=True) + with open(output_folder.joinpath("workflow.py"), "w") as fp: + fp.write(self.script) - self._write_output_file(self.script, output_file_path) + # additional files + self._copy_binary_files(output_folder) + self._generate_input_files(output_folder) - def _prep_commands(self): + def _prep_commands(self, output_folder: pathlib.Path) -> None: self.task_commands = {} - for task in self.workflow.workflow_json["workflow"]["execution"]["tasks"]: - command_str = " ".join([task["command"]["program"]] + task["command"]["arguments"]) + + for task in self.tasks.values(): + # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files] + # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files] + program = output_folder.joinpath(f'bin/{task.program}') + args = [] + for a in task.args: + if "--output-files" in a: + flag, output_files_dict = a.split(" ", 1) + output_files_dict = {str(output_folder.joinpath(f"data/{key}")): value for key, value in ast.literal_eval(output_files_dict).items()} + a = f"{flag} '{json.dumps(output_files_dict)}'" + elif "--input-files" in a: + flag, input_files_arr = a.split(" ", 1) + input_files_arr = [str(output_folder.joinpath(f"data/{file}")) for file in ast.literal_eval(input_files_arr)] + a = f"{flag} '{json.dumps(input_files_arr)}'" + else: + a = a.replace("'", "\"") + args.append(a) + + command_str = " ".join([str(program)] + args) # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} command_str = re.sub(r"(\{|\})", r"\"\1", command_str) # Prepends txt filenames with absolute path - command_str = re.sub(r"([\w\-]+\.txt)", - lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", - command_str) - self.task_commands[task["id"]] = command_str \ No newline at end of file + # command_str = re.sub(r"([\w\-]+\.txt)", + # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", + # command_str) + self.task_commands[task.task_id] = command_str + + + # def _prep_commands(self): + # self.task_commands = {} + # for task in self.workflow.workflow_json["workflow"]["execution"]["tasks"]: + # command_str = " ".join([task["command"]["program"]] + task["command"]["arguments"]) + # # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} + # command_str = re.sub(r"(\{|\})", r"\"\1", command_str) + # # Prepends txt filenames with absolute path + # command_str = re.sub(r"([\w\-]+\.txt)", + # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", + # command_str) + # self.task_commands[task["id"]] = command_str \ No newline at end of file From a671f7258ea573e8afe2e93f1b6c1726a74840db Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Wed, 9 Apr 2025 16:21:10 -1000 Subject: [PATCH 05/15] Added airflow dockfile (WIP). --- tests/translators/Airflow_README | 43 +++++++++++++++++++++++++ tests/translators/Dockerfile_Airflow | 40 +++++++++++------------ wfcommons/wfbench/translator/airflow.py | 10 +++--- 3 files changed, 66 insertions(+), 27 deletions(-) create mode 100644 tests/translators/Airflow_README diff --git a/tests/translators/Airflow_README b/tests/translators/Airflow_README new file mode 100644 index 00000000..b6605365 --- /dev/null +++ b/tests/translators/Airflow_README @@ -0,0 +1,43 @@ +Download wfcommons: + pip install wfcommons + +Download airflow with: + pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" + +Download MySQL and MySQLClient: + apt-get -y install pkg-config + apt-get install -y mysql-server + apt-get install -y python3-dev build-essential + apt-get install -y default-libmysqlclient-dev + pip install mysqlclient + +Setup database for Airflow: + mysqld --explicit-defaults-for-timestamp & + In MySQL: + CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass'; + GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'; + +Set airflow home directory: + export AIRFLOW_HOME="$(pwd)" + +Edit AIRFLOW_HOME/airflow.cfg (may need to run 'airflow dags list' to make the file appear): + sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost:3306/airflow_db + +Finish setting up database: + airflow db migrate + +Move the translated workflow.py file to AIRFLOW_HOME/dags/ + +Run workflow: + airflow dags test {workflow_name} + + + + + +(and running the translator) +Initialize the AirflowTranslator with the workflow to translate and the directory that will contain the workflow's input files. +Build the Airflow Docker container and run it in the directory with the translated dag. (Make sure there isn't a pre-existing 'airflow' directory.) + docker build -t wfcommons-dev -f Dockerfile_Airflow . + docker run -it --rm -v .:/home/wfcommons wfcommons-dev /bin/bash \ No newline at end of file diff --git a/tests/translators/Dockerfile_Airflow b/tests/translators/Dockerfile_Airflow index a4316dfc..f91bc0bf 100644 --- a/tests/translators/Dockerfile_Airflow +++ b/tests/translators/Dockerfile_Airflow @@ -1,5 +1,5 @@ # docker build -t wfcommons-dev -f Dockerfile_Airflow . -# docker run -it --rm -v .:/home/wfcommons wfcommons-dev /bin/bash +# docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash FROM ubuntu:noble @@ -21,7 +21,7 @@ RUN apt-get -y install cmake-data RUN apt-get -y install sudo RUN apt-get -y install vim --fix-missing RUN apt-get -y install gcc -#RUN apt-get -y install gcc-multilib +RUN apt-get -y install gcc-multilib # Python stuff RUN apt-get -y install python3 python3-pip @@ -33,47 +33,43 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 # Stress-ng RUN apt-get -y install stress-ng +# WfCommons +RUN python3 -m pip install wfcommons + # Install Airflow -RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" +RUN python3 -m pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" # Install MySQL/MyClient RUN apt-get install -y mysql-server RUN apt-get install -y python3-dev build-essential RUN apt-get install -y default-libmysqlclient-dev -RUN python3 -m pip install --break-system-packages mysqlclient +RUN python3 -m pip install mysqlclient + +# Setup directory +RUN mkdir /home/wfcommons # Create an entrypoint script to start mysqld in the background # and setup the Airflow DB RUN echo '#!/bin/bash' > /entrypoint.sh && \ - echo 'sudo mysqld --explicit-defaults-for-timestamp &' >> /entrypoint.sh && \ - echo 'until sudo mysqladmin ping -h localhost --silent; do' >> /entrypoint.sh && \ + echo 'mysqld --explicit-defaults-for-timestamp &' >> /entrypoint.sh && \ + echo 'until mysqladmin ping -h localhost --silent; do' >> /entrypoint.sh && \ echo ' echo "Waiting for MySQL to be ready..."' >> /entrypoint.sh && \ echo ' sleep 2' >> /entrypoint.sh && \ echo 'done' >> /entrypoint.sh && \ echo 'echo "MySQL is ready!"' >> /entrypoint.sh && \ echo 'echo "Setting up database for Airflow..."' >> /entrypoint.sh && \ - echo 'sudo mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \ - echo "sudo mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \ - echo "sudo mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \ + echo 'mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \ + echo "mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \ + echo "mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \ + echo 'export AIRFLOW_HOME="$(pwd)"/home/wfcommons/airflow/' >> /entrypoint.sh && \ echo "airflow config list 1> /dev/null" >> /entrypoint.sh && \ echo "sed -i ./airflow/airflow.cfg -e 's/sqlite:.*/mysql+mysqldb:\/\/airflow_user:airflow_pass@localhost:3306\/airflow_db/'" >> /entrypoint.sh && \ + echo 'airflow db migrate' >> /entrypoint.sh && \ echo 'echo "Airflow database setup!"' >> /entrypoint.sh && \ + echo 'mkdir ./airflow/dags' >> /entrypoint.sh && \ echo 'exec bash' >> /entrypoint.sh && \ chmod +x /entrypoint.sh -# Setup database for Airflow -#RUN mysqld --explicit-defaults-for-timestamp -#RUN mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"; -#RUN mysql -u root -e "CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass'"; -#RUN mysql -u root -e "GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'"; - -# Add wfcommons user -RUN useradd -ms /bin/bash wfcommons -RUN adduser wfcommons sudo -RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers -ENV PATH="$PATH:/home/wfcommons/.local/bin/" - -USER wfcommons WORKDIR /home/wfcommons ENTRYPOINT ["/entrypoint.sh"] diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 447170bf..89ed0d17 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -47,7 +47,7 @@ def __init__(self, from airflow.operators.bash import BashOperator with DAG( - "{self.workflow.name}_wfcommons", + "{self.workflow.name}", description="airflow translation of a wfcommons instance", schedule="0 0 * * *", start_date=datetime(2021, 1, 1), @@ -71,7 +71,7 @@ def translate(self, output_folder: pathlib.Path) -> None: {task.task_id} = BashOperator( task_id="{task.task_id}", depends_on_past=False, - bash_command="{self.task_commands[task.task_id]}", + bash_command='{self.task_commands[task.task_id]}', retries=3, ) """ @@ -102,18 +102,18 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: if "--output-files" in a: flag, output_files_dict = a.split(" ", 1) output_files_dict = {str(output_folder.joinpath(f"data/{key}")): value for key, value in ast.literal_eval(output_files_dict).items()} - a = f"{flag} '{json.dumps(output_files_dict)}'" + a = f"{flag} {json.dumps(output_files_dict)}" elif "--input-files" in a: flag, input_files_arr = a.split(" ", 1) input_files_arr = [str(output_folder.joinpath(f"data/{file}")) for file in ast.literal_eval(input_files_arr)] - a = f"{flag} '{json.dumps(input_files_arr)}'" + a = f"{flag} {json.dumps(input_files_arr)}" else: a = a.replace("'", "\"") args.append(a) command_str = " ".join([str(program)] + args) # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} - command_str = re.sub(r"(\{|\})", r"\"\1", command_str) + # command_str = re.sub(r"(\{|\})", r"\"\1", command_str) # Prepends txt filenames with absolute path # command_str = re.sub(r"([\w\-]+\.txt)", # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", From 793ba2bfc835a9ae4d1174e658062e33a2c16fa2 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 9 Apr 2025 16:31:51 -1000 Subject: [PATCH 06/15] Re-org Airflow translator test stuff --- tests/translators/{ => airflow}/Dockerfile_Airflow | 0 tests/translators/{Airflow_README => airflow/README} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tests/translators/{ => airflow}/Dockerfile_Airflow (100%) rename tests/translators/{Airflow_README => airflow/README} (100%) diff --git a/tests/translators/Dockerfile_Airflow b/tests/translators/airflow/Dockerfile_Airflow similarity index 100% rename from tests/translators/Dockerfile_Airflow rename to tests/translators/airflow/Dockerfile_Airflow diff --git a/tests/translators/Airflow_README b/tests/translators/airflow/README similarity index 100% rename from tests/translators/Airflow_README rename to tests/translators/airflow/README From e18e571d8914cc324acf7dbea2788d20fdcb19ad Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Wed, 9 Apr 2025 16:35:21 -1000 Subject: [PATCH 07/15] Cosmetic code fixes --- wfcommons/wfbench/translator/airflow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 89ed0d17..d07cdb22 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -58,10 +58,10 @@ def __init__(self, def translate(self, output_folder: pathlib.Path) -> None: """ - Translate a workflow benchmark description(WfFormat) into a Airflow workflow application. + Translate a workflow benchmark description(WfFormat) into an Airflow workflow application. - : param output_folder: The name of the output folder. - : type output_folder: pathlib.Path + :param output_folder: The name of the output folder. + :type output_folder: pathlib.Path """ self._prep_commands(output_folder) From a4f23065e1413d862c1638aaceb0b49ffff1b385 Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Mon, 14 Apr 2025 09:19:03 -1000 Subject: [PATCH 08/15] Put --break-system-packages back. --- tests/translators/Dockerfile_Airflow | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/translators/Dockerfile_Airflow b/tests/translators/Dockerfile_Airflow index f91bc0bf..d8681c41 100644 --- a/tests/translators/Dockerfile_Airflow +++ b/tests/translators/Dockerfile_Airflow @@ -34,15 +34,18 @@ RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1 RUN apt-get -y install stress-ng # WfCommons +RUN python3 -m pip install --break-system-packages wfcommons RUN python3 -m pip install wfcommons # Install Airflow +RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" RUN python3 -m pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" # Install MySQL/MyClient RUN apt-get install -y mysql-server RUN apt-get install -y python3-dev build-essential RUN apt-get install -y default-libmysqlclient-dev +RUN python3 -m pip install --break-system-packages mysqlclient RUN python3 -m pip install mysqlclient # Setup directory From f9fc1f79c489c467a55f9f00cbeff05d5c483460 Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Mon, 14 Apr 2025 15:19:03 -1000 Subject: [PATCH 09/15] Got airflow translator to work. --- tests/translators/airflow/Dockerfile_Airflow | 5 +---- wfcommons/wfbench/translator/airflow.py | 9 ++++++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/translators/airflow/Dockerfile_Airflow b/tests/translators/airflow/Dockerfile_Airflow index d8681c41..f6d63774 100644 --- a/tests/translators/airflow/Dockerfile_Airflow +++ b/tests/translators/airflow/Dockerfile_Airflow @@ -35,18 +35,15 @@ RUN apt-get -y install stress-ng # WfCommons RUN python3 -m pip install --break-system-packages wfcommons -RUN python3 -m pip install wfcommons # Install Airflow RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" -RUN python3 -m pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" # Install MySQL/MyClient RUN apt-get install -y mysql-server RUN apt-get install -y python3-dev build-essential RUN apt-get install -y default-libmysqlclient-dev RUN python3 -m pip install --break-system-packages mysqlclient -RUN python3 -m pip install mysqlclient # Setup directory RUN mkdir /home/wfcommons @@ -64,7 +61,7 @@ RUN echo '#!/bin/bash' > /entrypoint.sh && \ echo 'mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \ echo "mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \ echo "mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \ - echo 'export AIRFLOW_HOME="$(pwd)"/home/wfcommons/airflow/' >> /entrypoint.sh && \ + echo 'export AIRFLOW_HOME="$(pwd)"/airflow/' >> /entrypoint.sh && \ echo "airflow config list 1> /dev/null" >> /entrypoint.sh && \ echo "sed -i ./airflow/airflow.cfg -e 's/sqlite:.*/mysql+mysqldb:\/\/airflow_user:airflow_pass@localhost:3306\/airflow_db/'" >> /entrypoint.sh && \ echo 'airflow db migrate' >> /entrypoint.sh && \ diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index d07cdb22..94caf68c 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -42,6 +42,7 @@ def __init__(self, self.script = f""" from __future__ import annotations +import os from datetime import datetime from airflow.models.dag import DAG from airflow.operators.bash import BashOperator @@ -72,6 +73,7 @@ def translate(self, output_folder: pathlib.Path) -> None: task_id="{task.task_id}", depends_on_past=False, bash_command='{self.task_commands[task.task_id]}', + env={{"AIRFLOW_HOME": os.environ["AIRFLOW_HOME"]}}, retries=3, ) """ @@ -96,16 +98,17 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: for task in self.tasks.values(): # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files] # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files] - program = output_folder.joinpath(f'bin/{task.program}') + # program = "${AIRFLOW_HOME}/dags/" / output_folder / f"bin/{task.program}" + program = task.program args = [] for a in task.args: if "--output-files" in a: flag, output_files_dict = a.split(" ", 1) - output_files_dict = {str(output_folder.joinpath(f"data/{key}")): value for key, value in ast.literal_eval(output_files_dict).items()} + output_files_dict = {str("${AIRFLOW_HOME}/dags/" / output_folder / f"data/{key}"): value for key, value in ast.literal_eval(output_files_dict).items()} a = f"{flag} {json.dumps(output_files_dict)}" elif "--input-files" in a: flag, input_files_arr = a.split(" ", 1) - input_files_arr = [str(output_folder.joinpath(f"data/{file}")) for file in ast.literal_eval(input_files_arr)] + input_files_arr = [str("${AIRFLOW_HOME}/dags/" / output_folder / f"data/{file}") for file in ast.literal_eval(input_files_arr)] a = f"{flag} {json.dumps(input_files_arr)}" else: a = a.replace("'", "\"") From 0dd9c8e052a820137d998b90df80d20185d7b9c8 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 14 Apr 2025 15:49:51 -1000 Subject: [PATCH 10/15] Updated README file --- tests/translators/airflow/Dockerfile_Airflow | 2 +- tests/translators/airflow/README | 89 ++++++++++++++++---- 2 files changed, 73 insertions(+), 18 deletions(-) diff --git a/tests/translators/airflow/Dockerfile_Airflow b/tests/translators/airflow/Dockerfile_Airflow index f6d63774..48394454 100644 --- a/tests/translators/airflow/Dockerfile_Airflow +++ b/tests/translators/airflow/Dockerfile_Airflow @@ -21,7 +21,7 @@ RUN apt-get -y install cmake-data RUN apt-get -y install sudo RUN apt-get -y install vim --fix-missing RUN apt-get -y install gcc -RUN apt-get -y install gcc-multilib +#RUN apt-get -y install gcc-multilib # Python stuff RUN apt-get -y install python3 python3-pip diff --git a/tests/translators/airflow/README b/tests/translators/airflow/README index b6605365..c151ebac 100644 --- a/tests/translators/airflow/README +++ b/tests/translators/airflow/README @@ -1,43 +1,98 @@ -Download wfcommons: - pip install wfcommons +This README file describes steps to install/run Airflow, and then run a translated workflow. + +There are three sections: + - Installing Airflow on bare-metal + - Installing Airflow via Docker + - Running a translated workflow + + +Install Airflow on bare-metal +------------------------------ + +1. Install Airflow -Download airflow with: pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt" -Download MySQL and MySQLClient: +2. Install MySQL and MySQLClient + apt-get -y install pkg-config apt-get install -y mysql-server apt-get install -y python3-dev build-essential apt-get install -y default-libmysqlclient-dev pip install mysqlclient -Setup database for Airflow: +3. Setup database for Airflow + mysqld --explicit-defaults-for-timestamp & - In MySQL: + In MySQL client, type the following: CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass'; GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user'; -Set airflow home directory: +4. Set env variable for Airflow's home directory + export AIRFLOW_HOME="$(pwd)" -Edit AIRFLOW_HOME/airflow.cfg (may need to run 'airflow dags list' to make the file appear): +5. Edit $AIRFLOW_HOME/airflow.cfg (may need to run `airflow dags list` to create this file in the first place) + + Update the "sql_alchemy_conn = ..." line to be: + sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost:3306/airflow_db -Finish setting up database: +6. Finish setting up the database + airflow db migrate -Move the translated workflow.py file to AIRFLOW_HOME/dags/ -Run workflow: - airflow dags test {workflow_name} +Installing Airflow via Docker +----------------------------- + +A much simpler alternative is to use Docker. + +1. Build the docker image + + docker build -t wfcommons-dev -f Dockerfile_Airflow . + + (if building on a Mac, add the `--platform linux/amd64` argument after build above) + +2. Run the docker container in the directory to contains the translated workflow (see last section below) + + docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash + + +Running a translated workflow with Airflow +------------------------------------------- + +Assuming that you have run the airflow translator, for instance, using this Python code: + +``` +import pathlib + +from wfcommons import BlastRecipe +from wfcommons.wfbench import WorkflowBenchmark, AirflowTranslator + +# create a workflow benchmark object to generate specifications based on a recipe +benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=45) + +# generate a specification based on performance characteristics +benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) + +# generate an Airflow workflow +translator = AirflowTranslator(benchmark.workflow) +translator.translate(output_folder=pathlib.Path("/tmp/translated_workflow/")) +``` + +The above will create a JSON worfklow file in /tmp/blast-benchmark-45.json. +In that file, the workflow name (this is used below) is set to +"Blast-Benchmark". The above will also create the translated workflow the +/tmp/translated_workflow/ directory. Some directories and files need to be copied/moved as follows: + + cp -r /tmp/translated_workflow/ $AIRFLOW_HOME/dags/ + mv $AIRFLOW_HOME/dags/translated_workflow/workflow.py $AIRFLOW_HOME/dags/ +Finally, run the workflow as: + airflow dags test Blast-Benchmark (not the "Blast-Benchmark" workflow name from above) -(and running the translator) -Initialize the AirflowTranslator with the workflow to translate and the directory that will contain the workflow's input files. -Build the Airflow Docker container and run it in the directory with the translated dag. (Make sure there isn't a pre-existing 'airflow' directory.) - docker build -t wfcommons-dev -f Dockerfile_Airflow . - docker run -it --rm -v .:/home/wfcommons wfcommons-dev /bin/bash \ No newline at end of file From ea69721f5e79fc0bd1eaf3ee28f1b62dcea1a8f0 Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 14 Apr 2025 16:07:17 -1000 Subject: [PATCH 11/15] Path fix --- wfcommons/wfbench/translator/airflow.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 94caf68c..8af7d4ed 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -95,6 +95,8 @@ def translate(self, output_folder: pathlib.Path) -> None: def _prep_commands(self, output_folder: pathlib.Path) -> None: self.task_commands = {} + + for task in self.tasks.values(): # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files] # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files] @@ -104,11 +106,11 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: for a in task.args: if "--output-files" in a: flag, output_files_dict = a.split(" ", 1) - output_files_dict = {str("${AIRFLOW_HOME}/dags/" / output_folder / f"data/{key}"): value for key, value in ast.literal_eval(output_files_dict).items()} + output_files_dict = {str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{key}"): value for key, value in ast.literal_eval(output_files_dict).items()} a = f"{flag} {json.dumps(output_files_dict)}" elif "--input-files" in a: flag, input_files_arr = a.split(" ", 1) - input_files_arr = [str("${AIRFLOW_HOME}/dags/" / output_folder / f"data/{file}") for file in ast.literal_eval(input_files_arr)] + input_files_arr = [str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{file}") for file in ast.literal_eval(input_files_arr)] a = f"{flag} {json.dumps(input_files_arr)}" else: a = a.replace("'", "\"") @@ -134,4 +136,4 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: # command_str = re.sub(r"([\w\-]+\.txt)", # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", # command_str) - # self.task_commands[task["id"]] = command_str \ No newline at end of file + # self.task_commands[task["id"]] = command_str From c38a5f3267e7d05711876bafa8ac0204210ca6d9 Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Sun, 20 Apr 2025 14:12:21 -1000 Subject: [PATCH 12/15] Added escape characters to translated bash_command strings. --- wfcommons/wfbench/translator/airflow.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 8af7d4ed..2f8dd969 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -95,7 +95,7 @@ def translate(self, output_folder: pathlib.Path) -> None: def _prep_commands(self, output_folder: pathlib.Path) -> None: self.task_commands = {} - + for task in self.tasks.values(): # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files] @@ -117,12 +117,17 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: args.append(a) command_str = " ".join([str(program)] + args) - # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} - # command_str = re.sub(r"(\{|\})", r"\"\1", command_str) - # Prepends txt filenames with absolute path - # command_str = re.sub(r"([\w\-]+\.txt)", - # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", - # command_str) + + # Escapes all double quotes + command_str = command_str.replace('"', '\\\\"') + + # Wraps --output-files and --input-files arguments in double quotes + command_str = re.sub( + r'(--output-files) (\{.*\}) (--input-files) (\[.*?\])', + lambda m: f'{m.group(1)} "{m.group(2)}" {m.group(3)} "{m.group(4)}"', + command_str + ) + self.task_commands[task.task_id] = command_str From d2e644814567ec60b1f995cb2e95672c0a3367ce Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 21 Apr 2025 13:42:31 -1000 Subject: [PATCH 13/15] README file touchups --- bin/wfbench | 473 ------------------------------- tests/translators/airflow/README | 12 +- 2 files changed, 8 insertions(+), 477 deletions(-) delete mode 100755 bin/wfbench diff --git a/bin/wfbench b/bin/wfbench deleted file mode 100755 index 05c2fdb5..00000000 --- a/bin/wfbench +++ /dev/null @@ -1,473 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (c) 2021-2025 The WfCommons Team. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -import os -import pathlib -import subprocess -import time -import sys -import signal -import queue -import argparse -import re -import json -import logging -import pandas as pd - -from io import StringIO -from filelock import FileLock -from pathos.helpers import mp as multiprocessing -from typing import List, Optional - - -# Configure logging -logging.basicConfig( - level=logging.INFO, # Change this to control the verbosity - format="[WfBench][%(asctime)s][%(levelname)s] %(message)s", - datefmt="%H:%M:%S", - handlers=[logging.StreamHandler()] -) - - -this_dir = pathlib.Path(__file__).resolve().parent - - -def log_info(msg: str): - """ - Log an info message to stderr - - :param msg: - :type msg: str - """ - logging.info(msg) - -def log_debug(msg: str): - """ - Log a debug message to stderr - - :param msg: - :type msg: str - """ - logging.debug(msg) - -def log_error(msg: str): - """ - Log a error message to stderr - - :param msg: - :type msg: str - """ - logging.error(msg) - - -def lock_core(path_locked: pathlib.Path, - path_cores: pathlib.Path) -> int: - """ - Lock cores in use. - - :param path_locked: - :type path_locked: pathlib.Path - :param path_cores: - :type path_cores: pathlib.Path - - :return: - :rtype: int - """ - all_cores = set(range(os.cpu_count())) - path_locked.touch(exist_ok=True) - path_cores.touch(exist_ok=True) - - while True: - with FileLock(path_locked) as lock: - try: - lock.acquire() - taken_cores = { - int(line) for line in path_cores.read_text().splitlines() if line.strip() - } - available = all_cores - taken_cores - if available: - core = available.pop() - taken_cores.add(core) - path_cores.write_text("\n".join(map(str, taken_cores))) - return core - - log_debug("All Cores are taken") - finally: - lock.release() - time.sleep(1) - - -def unlock_core(path_locked: pathlib.Path, - path_cores: pathlib.Path, - core: int) -> None: - """ - Unlock cores after execution is done. - - :param path_locked: - :type path_locked: pathlib.Path - :param path_cores: - :type path_cores: pathlib.Path - :param core: - :type core: int - """ - with FileLock(path_locked) as lock: - lock.acquire() - try: - taken_cores = { - int(line) for line in path_cores.read_text().splitlines() - if int(line) != core - } - path_cores.write_text("\n".join(map(str, taken_cores))) - finally: - lock.release() - -def monitor_progress(proc, cpu_queue): - """Monitor progress from the CPU benchmark process.""" - for line in iter(proc.stdout.readline, ""): # No decode needed - line = line.strip() - if line.startswith("Progress:"): - try: - progress = float(line.split()[1].strip('%')) - cpu_queue.put(progress) - except (ValueError, IndexError): - pass - -def cpu_mem_benchmark(cpu_queue: multiprocessing.Queue, - cpu_threads: Optional[int] = 5, - mem_threads: Optional[int] = 5, - cpu_work: Optional[int] = 100, - core: Optional[int] = None, - total_mem: Optional[int] = None) -> List: - """ - Run CPU and memory benchmark. - - :param cpu_queue: Queue to push CPU benchmark progress as a float. - :type cpu_queue: multiprocessing.Queue - :param cpu_threads: Number of threads for CPU benchmark. - :type cpu_threads: Optional[int] - :param mem_threads: Number of threads for memory benchmark. - :type mem_threads: Optional[int] - :param cpu_work: Total work units for CPU benchmark. - :type cpu_work: Optional[int] - :param core: Core to pin the benchmark processes to. - :type core: Optional[int] - :param total_mem: Total memory to use for memory benchmark. - :type total_mem: Optional[float] - - :return: Lists of CPU and memory subprocesses. - :rtype: List - """ - total_mem = f"{total_mem}B" if total_mem else f"{100.0 / os.cpu_count()}%" - cpu_work_per_thread = int(cpu_work / cpu_threads) - - cpu_procs = [] - mem_procs = [] - cpu_prog = [f"{this_dir.joinpath('cpu-benchmark')}", f"{cpu_work_per_thread}"] - mem_prog = ["stress-ng", "--vm", f"{mem_threads}", - "--vm-bytes", f"{total_mem}", "--vm-keep"] - - for i in range(cpu_threads): - cpu_proc = subprocess.Popen(cpu_prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - - # NOTE: might be a good idea to use psutil to set the affinity (works across platforms) - if core: - os.sched_setaffinity(cpu_proc.pid, {core}) - cpu_procs.append(cpu_proc) - - # Start a thread to monitor the progress of each CPU benchmark process - monitor_thread = multiprocessing.Process(target=monitor_progress, args=(cpu_proc, cpu_queue)) - monitor_thread.start() - - if mem_threads > 0: - # NOTE: add a check to use creationflags=subprocess.CREATE_NEW_PROCESS_GROUP for Windows - mem_proc = subprocess.Popen(mem_prog, preexec_fn=os.setsid) - if core: - os.sched_setaffinity(mem_proc.pid, {core}) - mem_procs.append(mem_proc) - - return cpu_procs, mem_procs - - -def io_read_benchmark_user_input_data_size(inputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = -1 - memory_limit = int(memory_limit) - log_debug("Starting IO Read Benchmark...") - for file, size in inputs.items(): - with open(rundir.joinpath(file), "rb") as fp: - log_debug(f"Reading '{file}'") - chunk_size = min(size, memory_limit) - while fp.read(chunk_size): - pass - log_debug("Completed IO Read Benchmark!") - - -def io_write_benchmark_user_input_data_size(outputs, - rundir=None, - memory_limit=None): - if memory_limit is None: - memory_limit = sys.maxsize - memory_limit = int(memory_limit) - for file_name, file_size in outputs.items(): - log_debug(f"Writing output file '{file_name}'") - file_size_todo = file_size - while file_size_todo > 0: - with open(rundir.joinpath(file_name), "ab") as fp: - chunk_size = min(file_size_todo, memory_limit) - file_size_todo -= fp.write(os.urandom(int(chunk_size))) - - -def io_alternate(inputs, outputs, cpu_queue: multiprocessing.Queue, memory_limit=None, rundir=None, event=None): - """Alternate between reading and writing to a file, ensuring read only happens after write.""" - - if memory_limit is None: - memory_limit = 10 * 1024 * 1024 # sys.maxsize - memory_limit = int(memory_limit) - - # queue will have messages in the form (cpu_percent_completed) - # Get the last message and trash the rest - - # Create empty files - for name in outputs: - open(rundir.joinpath(name), "wb").close() - - io_completed = 0 - bytes_read = { - name: 0 - for name in inputs - } - bytes_written = { - name: 0 - for name in outputs - } - - # get size of inputs - inputs = { - name: os.path.getsize(rundir.joinpath(name)) - for name in inputs - } - - while io_completed < 100: - cpu_percent = max(io_completed, cpu_queue.get()) - while True: # Get the last message - try: - cpu_percent = max(io_completed, cpu_queue.get_nowait()) - except queue.Empty: - break - - log_debug(f"CPU Percent: {cpu_percent}") - if cpu_percent: - bytes_to_read = { - name: int(size * (cpu_percent / 100) - bytes_read[name]) - for name, size in inputs.items() - } - bytes_to_write = { - name: int(size * (cpu_percent / 100) - bytes_written[name]) - for name, size in outputs.items() - } - io_read_benchmark_user_input_data_size(bytes_to_read, rundir, memory_limit=memory_limit) - io_write_benchmark_user_input_data_size(bytes_to_write, rundir, memory_limit=memory_limit) - - bytes_read = { - name: bytes_read[name] + bytes_to_read[name] - for name in bytes_to_read - } - bytes_written = { - name: bytes_written[name] + bytes_to_write[name] - for name in bytes_to_write - } - - log_debug(f"Bytes Read: {bytes_read}") - log_debug(f"Bytes Written: {bytes_written}") - - io_completed = cpu_percent - - if io_completed >= 100: - break - -def get_available_gpus(): - proc = subprocess.Popen(["nvidia-smi", "--query-gpu=utilization.gpu", "--format=csv"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, _ = proc.communicate() - df = pd.read_csv(StringIO(stdout.decode("utf-8")), sep=" ") - return df[df["utilization.gpu"] <= 5].index.to_list() - - -def gpu_benchmark(time: int = 100, - work: int = 100, - device: int = 0): #work, device - - gpu_prog = [f"CUDA_DEVICE_ORDER=PCI_BUS_ID CUDA_VISIBLE_DEVICES={device} {this_dir.joinpath('./gpu_benchmark')} {work} {time}"] - log_debug(f"Running GPU Benchmark: {gpu_prog}") - subprocess.Popen(gpu_prog, shell=True) - - -def get_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser() - parser.add_argument("--name", default=None, required=True, help="Task name.") - parser.add_argument("--rundir", help="Run directory.") - parser.add_argument("--percent-cpu", default=0.5, type=float, - help="percentage related to the number of CPU threads.") - parser.add_argument("--path-lock", default=None, help="Path to lock file.") - parser.add_argument("--path-cores", default=None, help="Path to cores file.") - parser.add_argument("--cpu-work", default=None, help="Amount of CPU work.") - parser.add_argument("--gpu-work", default=None, help="Amount of GPU work.") - parser.add_argument("--time", default=None, help="Time limit (in seconds) to complete the task (overrides CPU and GPU works).") - parser.add_argument("--mem", type=float, default=None, help="Max amount (in MB) of memory consumption.") - parser.add_argument("--output-files", help="output file names with sizes in bytes as a JSON dictionary " - "(e.g., --output-files {\\\"file1\\\": 1024, \\\"file2\\\": 2048}).") - parser.add_argument("--input-files", help="input files names as a JSON array " - "(e.g., --input-files [\\\"file3\\\", \\\"file4\\\"]).") - parser.add_argument("--debug", action="store_true", help="Enable debug messages.") - - return parser - - -def main(): - """Main program.""" - parser = get_parser() - args = parser.parse_args() - core = None - - if args.debug: - logging.getLogger().setLevel(logging.DEBUG) - - if args.rundir: - rundir = pathlib.Path(args.rundir) - else: - rundir = pathlib.Path(os.getcwd()) - - if args.path_lock and args.path_cores: - path_locked = pathlib.Path(args.path_lock) - path_cores = pathlib.Path(args.path_cores) - core = lock_core(path_locked, path_cores) - - log_info(f"Starting {args.name} Benchmark") - - mem_bytes = args.mem * 1024 * 1024 if args.mem else None - - procs = [] - io_proc = None - outputs_dict = {} - - cpu_queue = multiprocessing.Queue() - - log_debug(f"Working directory: {os.getcwd()}") - - # Deal with input/output files if any - cleaned_input = "{}" if args.input_files is None else re.sub(r'\\+', '', args.input_files) - cleaned_output = "{}" if args.output_files is None else re.sub(r'\\+', '', args.output_files) - # print("CLEANED INPUT", cleaned_input) - # print("CLEANED OUTPUT", cleaned_output) - - if cleaned_input or cleaned_output: - log_debug("Starting IO benchmark...") - - # Attempt to parse the cleaned string - try: - outputs_dict = json.loads(cleaned_output) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --output-files JSON string argument: {e}") - sys.exit(1) - - try: - inputs_array = json.loads(cleaned_input) - except json.JSONDecodeError as e: - log_error(f"Failed to decode --input-files JSON string argument: {e}") - sys.exit(1) - - # print("OUTPUT", outputs_dict) - # print("INPUTS", inputs_array) - - # Create a multiprocessing event that in the first run is set to True - write_done_event = multiprocessing.Event() - # Set this to True to allow the first read to happen - write_done_event.set() - # Print the value of the event - # print("Event Value:", write_done_event.is_set()) - - io_proc = multiprocessing.Process( - target=io_alternate, - args=(inputs_array, outputs_dict, cpu_queue, mem_bytes, rundir, write_done_event) - ) - io_proc.start() - procs.append(io_proc) - - if args.gpu_work: - log_info(f"Starting GPU Benchmark for {args.name}...") - available_gpus = get_available_gpus() #checking for available GPUs - - if not available_gpus: - log_error("No GPU available") - sys.exit(1) - else: - device = available_gpus[0] - log_debug(f"Running on GPU {device}") - - if args.time: - log_debug(f" Time:{args.time}, Work:{args.gpu_work}, Device:{device}") - gpu_benchmark(time=int(args.time), work=int(args.gpu_work), device=device) - else: - gpu_benchmark(work=int(args.gpu_work), device=device) - - if args.cpu_work: - log_info(f"Starting CPU and Memory Benchmarks for {args.name}...") - if core: - log_debug(f"{args.name} acquired core {core}") - - mem_threads=int(10 - 10 * args.percent_cpu) - cpu_procs, mem_procs = cpu_mem_benchmark(cpu_queue=cpu_queue, - cpu_threads=int(10 * args.percent_cpu), - mem_threads=mem_threads, - cpu_work=sys.maxsize if args.time else int(args.cpu_work), - core=core, - total_mem=mem_bytes) - - procs.extend(cpu_procs) - if args.time: - time.sleep(int(args.time)) - for proc in procs: - os.killpg(os.getpgid(proc.pid), signal.SIGTERM) - else: - for proc in procs: - if isinstance(proc, subprocess.Popen): - proc.wait() - if io_proc is not None and io_proc.is_alive(): - # io_proc.terminate() - io_proc.join() - - for mem_proc in mem_procs: - try: - os.kill(mem_proc.pid, signal.SIGKILL) # Force kill if SIGTERM fails - except subprocess.TimeoutExpired: - log_debug("Memory process did not terminate; force-killing.") - # As a fallback, use pkill if any remaining instances are stuck - subprocess.Popen(["pkill", "-f", "stress-ng"]).wait() - - log_debug("Completed CPU and Memory Benchmarks!") - - # NOTE: If you would like to run only IO add time.sleep(2) - # Check if all procs are done, if not, kill them - log_debug("Checking if all processes are done...") - for proc in procs: - if isinstance(proc, multiprocessing.Process): - if proc.is_alive(): - proc.terminate() - proc.join() - if isinstance(proc, subprocess.Popen): - proc.wait() - - if core: - unlock_core(path_locked, path_cores, core) - log_info(f"Benchmark {args.name} completed!") - -if __name__ == "__main__": - main() diff --git a/tests/translators/airflow/README b/tests/translators/airflow/README index c151ebac..d18deeb5 100644 --- a/tests/translators/airflow/README +++ b/tests/translators/airflow/README @@ -1,4 +1,5 @@ -This README file describes steps to install/run Airflow, and then run a translated workflow. +This README file describes steps to install/run Airflow, and then run a +translated workflow. There are three sections: - Installing Airflow on bare-metal @@ -55,7 +56,8 @@ A much simpler alternative is to use Docker. (if building on a Mac, add the `--platform linux/amd64` argument after build above) -2. Run the docker container in the directory to contains the translated workflow (see last section below) +2. Run the docker container in the directory to contains the translated + workflow (see last section below) docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash @@ -77,14 +79,16 @@ benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=45) # generate a specification based on performance characteristics benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) -# generate an Airflow workflow +# generate an Airflow workflow translator = AirflowTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("/tmp/translated_workflow/")) ``` The above will create a JSON worfklow file in /tmp/blast-benchmark-45.json. In that file, the workflow name (this is used below) is set to -"Blast-Benchmark". The above will also create the translated workflow the +"Blast-Benchmark". + +The above will also create the translated workflow the /tmp/translated_workflow/ directory. Some directories and files need to be copied/moved as follows: cp -r /tmp/translated_workflow/ $AIRFLOW_HOME/dags/ From ccb5f24849cbc334d9ef5bedbf042b3db1816aab Mon Sep 17 00:00:00 2001 From: Henri Casanova Date: Mon, 21 Apr 2025 13:43:00 -1000 Subject: [PATCH 14/15] typo-- --- tests/translators/README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/translators/README b/tests/translators/README index 5bea7202..ee8377ad 100644 --- a/tests/translators/README +++ b/tests/translators/README @@ -1 +1 @@ -For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and great a Github action for testing all translators. +For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and create a Github action for testing all translators. From 677384f22c81b28febaad972325c30d897848e8a Mon Sep 17 00:00:00 2001 From: sukaryo-heilscher Date: Mon, 21 Apr 2025 13:50:54 -1000 Subject: [PATCH 15/15] Cleaned up comments. --- wfcommons/wfbench/translator/airflow.py | 29 ++++++------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/wfcommons/wfbench/translator/airflow.py b/wfcommons/wfbench/translator/airflow.py index 2f8dd969..c6309425 100644 --- a/wfcommons/wfbench/translator/airflow.py +++ b/wfcommons/wfbench/translator/airflow.py @@ -31,14 +31,10 @@ class AirflowTranslator(Translator): def __init__(self, workflow: Union[Workflow, pathlib.Path], - logger: Optional[Logger] = None, - # input_file_directory: pathlib.Path = pathlib.Path("/") - ) -> None: + logger: Optional[Logger] = None) -> None: """Create an object of the translator.""" super().__init__(workflow, logger) - # self.input_file_directory = input_file_directory - # self.script = f""" from __future__ import annotations @@ -93,14 +89,15 @@ def translate(self, output_folder: pathlib.Path) -> None: self._generate_input_files(output_folder) def _prep_commands(self, output_folder: pathlib.Path) -> None: - self.task_commands = {} - + """ + Prepares the bash_command strings for the BashOperators. + :param output_folder: The name of the output folder. + :type output_folder: pathlib.Path + """ + self.task_commands = {} for task in self.tasks.values(): - # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files] - # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files] - # program = "${AIRFLOW_HOME}/dags/" / output_folder / f"bin/{task.program}" program = task.program args = [] for a in task.args: @@ -130,15 +127,3 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None: self.task_commands[task.task_id] = command_str - - # def _prep_commands(self): - # self.task_commands = {} - # for task in self.workflow.workflow_json["workflow"]["execution"]["tasks"]: - # command_str = " ".join([task["command"]["program"]] + task["command"]["arguments"]) - # # Prepends { and } with \" (i.e. {hi} -> \"{hi\"} - # command_str = re.sub(r"(\{|\})", r"\"\1", command_str) - # # Prepends txt filenames with absolute path - # command_str = re.sub(r"([\w\-]+\.txt)", - # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}", - # command_str) - # self.task_commands[task["id"]] = command_str