Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/translators/README
Original file line number Diff line number Diff line change
@@ -1 +1 @@
For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and great a Github action for testing all translators.
For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and create a Github action for testing all translators.
75 changes: 75 additions & 0 deletions tests/translators/airflow/Dockerfile_Airflow
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# docker build -t wfcommons-dev -f Dockerfile_Airflow .
# docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash

FROM ubuntu:noble

LABEL org.containers.image.authors="sukaryor@hawaii.edu"

# update repositories
RUN apt-get update

# set timezone
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata

# install useful stuff
RUN apt-get -y install pkg-config
RUN apt-get -y install git
RUN apt-get -y install wget
RUN apt-get -y install make
RUN apt-get -y install cmake
RUN apt-get -y install cmake-data
RUN apt-get -y install sudo
RUN apt-get -y install vim --fix-missing
RUN apt-get -y install gcc
#RUN apt-get -y install gcc-multilib

# Python stuff
RUN apt-get -y install python3 python3-pip
RUN python3 -m pip install --break-system-packages pathos pandas filelock
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1

# Stress-ng
RUN apt-get -y install stress-ng

# WfCommons
RUN python3 -m pip install --break-system-packages wfcommons

# Install Airflow
RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt"

# Install MySQL/MyClient
RUN apt-get install -y mysql-server
RUN apt-get install -y python3-dev build-essential
RUN apt-get install -y default-libmysqlclient-dev
RUN python3 -m pip install --break-system-packages mysqlclient

# Setup directory
RUN mkdir /home/wfcommons

# Create an entrypoint script to start mysqld in the background
# and setup the Airflow DB
RUN echo '#!/bin/bash' > /entrypoint.sh && \
echo 'mysqld --explicit-defaults-for-timestamp &' >> /entrypoint.sh && \
echo 'until mysqladmin ping -h localhost --silent; do' >> /entrypoint.sh && \
echo ' echo "Waiting for MySQL to be ready..."' >> /entrypoint.sh && \
echo ' sleep 2' >> /entrypoint.sh && \
echo 'done' >> /entrypoint.sh && \
echo 'echo "MySQL is ready!"' >> /entrypoint.sh && \
echo 'echo "Setting up database for Airflow..."' >> /entrypoint.sh && \
echo 'mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \
echo "mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \
echo "mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \
echo 'export AIRFLOW_HOME="$(pwd)"/airflow/' >> /entrypoint.sh && \
echo "airflow config list 1> /dev/null" >> /entrypoint.sh && \
echo "sed -i ./airflow/airflow.cfg -e 's/sqlite:.*/mysql+mysqldb:\/\/airflow_user:airflow_pass@localhost:3306\/airflow_db/'" >> /entrypoint.sh && \
echo 'airflow db migrate' >> /entrypoint.sh && \
echo 'echo "Airflow database setup!"' >> /entrypoint.sh && \
echo 'mkdir ./airflow/dags' >> /entrypoint.sh && \
echo 'exec bash' >> /entrypoint.sh && \
chmod +x /entrypoint.sh

WORKDIR /home/wfcommons

ENTRYPOINT ["/entrypoint.sh"]
102 changes: 102 additions & 0 deletions tests/translators/airflow/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
This README file describes steps to install/run Airflow, and then run a
translated workflow.

There are three sections:
- Installing Airflow on bare-metal
- Installing Airflow via Docker
- Running a translated workflow


Install Airflow on bare-metal
------------------------------

1. Install Airflow

pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt"

2. Install MySQL and MySQLClient

apt-get -y install pkg-config
apt-get install -y mysql-server
apt-get install -y python3-dev build-essential
apt-get install -y default-libmysqlclient-dev
pip install mysqlclient

3. Setup database for Airflow

mysqld --explicit-defaults-for-timestamp &
In MySQL client, type the following:
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';

4. Set env variable for Airflow's home directory

export AIRFLOW_HOME="$(pwd)"

5. Edit $AIRFLOW_HOME/airflow.cfg (may need to run `airflow dags list` to create this file in the first place)

Update the "sql_alchemy_conn = ..." line to be:

sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost:3306/airflow_db

6. Finish setting up the database

airflow db migrate


Installing Airflow via Docker
-----------------------------

A much simpler alternative is to use Docker.

1. Build the docker image

docker build -t wfcommons-dev -f Dockerfile_Airflow .

(if building on a Mac, add the `--platform linux/amd64` argument after build above)

2. Run the docker container in the directory to contains the translated
workflow (see last section below)

docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash


Running a translated workflow with Airflow
-------------------------------------------

Assuming that you have run the airflow translator, for instance, using this Python code:

```
import pathlib

from wfcommons import BlastRecipe
from wfcommons.wfbench import WorkflowBenchmark, AirflowTranslator

# create a workflow benchmark object to generate specifications based on a recipe
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=45)

# generate a specification based on performance characteristics
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)

# generate an Airflow workflow
translator = AirflowTranslator(benchmark.workflow)
translator.translate(output_folder=pathlib.Path("/tmp/translated_workflow/"))
```

The above will create a JSON worfklow file in /tmp/blast-benchmark-45.json.
In that file, the workflow name (this is used below) is set to
"Blast-Benchmark".

The above will also create the translated workflow the
/tmp/translated_workflow/ directory. Some directories and files need to be copied/moved as follows:

cp -r /tmp/translated_workflow/ $AIRFLOW_HOME/dags/
mv $AIRFLOW_HOME/dags/translated_workflow/workflow.py $AIRFLOW_HOME/dags/

Finally, run the workflow as:

airflow dags test Blast-Benchmark (not the "Blast-Benchmark" workflow name from above)



2 changes: 1 addition & 1 deletion wfcommons/wfbench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# (at your option) any later version.

from .bench import WorkflowBenchmark
from .translator import DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
1 change: 1 addition & 0 deletions wfcommons/wfbench/translator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

from .airflow import AirflowTranslator
from .dask import DaskTranslator
from .nextflow import NextflowTranslator
from .parsl import ParslTranslator
Expand Down
129 changes: 129 additions & 0 deletions wfcommons/wfbench/translator/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021-2024 The WfCommons Team.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

import pathlib
import re
import ast
import json

from logging import Logger
from typing import Optional, Union

from .abstract_translator import Translator
from ...common import Workflow

class AirflowTranslator(Translator):
"""
A WfFormat parser for creating Nextflow workflow applications.

:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""

def __init__(self,
workflow: Union[Workflow, pathlib.Path],
logger: Optional[Logger] = None) -> None:
"""Create an object of the translator."""
super().__init__(workflow, logger)

self.script = f"""
from __future__ import annotations

import os
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

with DAG(
"{self.workflow.name}",
description="airflow translation of a wfcommons instance",
schedule="0 0 * * *",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["wfcommons"],
) as dag:
"""

def translate(self, output_folder: pathlib.Path) -> None:
"""
Translate a workflow benchmark description(WfFormat) into an Airflow workflow application.

:param output_folder: The name of the output folder.
:type output_folder: pathlib.Path
"""

self._prep_commands(output_folder)

for task in self.tasks.values():
self.script += f"""
{task.task_id} = BashOperator(
task_id="{task.task_id}",
depends_on_past=False,
bash_command='{self.task_commands[task.task_id]}',
env={{"AIRFLOW_HOME": os.environ["AIRFLOW_HOME"]}},
retries=3,
)
"""
for task in self.tasks.values():
parents = ", ".join(self.task_parents[task.task_id])
if parents:
self.script += f"""
[{parents}] >> {task.task_id}
"""
# write benchmark files
output_folder.mkdir(parents=True)
with open(output_folder.joinpath("workflow.py"), "w") as fp:
fp.write(self.script)

# additional files
self._copy_binary_files(output_folder)
self._generate_input_files(output_folder)

def _prep_commands(self, output_folder: pathlib.Path) -> None:
"""
Prepares the bash_command strings for the BashOperators.

:param output_folder: The name of the output folder.
:type output_folder: pathlib.Path
"""
self.task_commands = {}

for task in self.tasks.values():
program = task.program
args = []
for a in task.args:
if "--output-files" in a:
flag, output_files_dict = a.split(" ", 1)
output_files_dict = {str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{key}"): value for key, value in ast.literal_eval(output_files_dict).items()}
a = f"{flag} {json.dumps(output_files_dict)}"
elif "--input-files" in a:
flag, input_files_arr = a.split(" ", 1)
input_files_arr = [str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{file}") for file in ast.literal_eval(input_files_arr)]
a = f"{flag} {json.dumps(input_files_arr)}"
else:
a = a.replace("'", "\"")
args.append(a)

command_str = " ".join([str(program)] + args)

# Escapes all double quotes
command_str = command_str.replace('"', '\\\\"')

# Wraps --output-files and --input-files arguments in double quotes
command_str = re.sub(
r'(--output-files) (\{.*\}) (--input-files) (\[.*?\])',
lambda m: f'{m.group(1)} "{m.group(2)}" {m.group(3)} "{m.group(4)}"',
command_str
)

self.task_commands[task.task_id] = command_str