From 8eb36351c9f146e6629469aa47d18085d658ed4e Mon Sep 17 00:00:00 2001 From: Philipp Wambach Date: Thu, 3 Nov 2022 10:58:44 +0100 Subject: [PATCH] feat(pipeline): introducing airflow --- .gitignore | 4 + pipeline/.env | 1 + pipeline/Dockerfile | 12 + pipeline/dags/cloud.cfc.py | 177 ++++++++++++++ pipeline/dags/shared.py | 47 ++++ pipeline/docker-compose.yaml | 278 ++++++++++++++++++++++ pipeline/plugins/colors/cloud.cfc.txt | 3 + pipeline/plugins/generate-legend-image.js | 69 ++++++ 8 files changed, 591 insertions(+) create mode 100644 pipeline/.env create mode 100644 pipeline/Dockerfile create mode 100644 pipeline/dags/cloud.cfc.py create mode 100644 pipeline/dags/shared.py create mode 100644 pipeline/docker-compose.yaml create mode 100644 pipeline/plugins/colors/cloud.cfc.txt create mode 100644 pipeline/plugins/generate-legend-image.js diff --git a/.gitignore b/.gitignore index dc3798b04..df6a52a53 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,7 @@ *.ipynb .DS_Store assets/legend-images/ + +pipeline/logs +pipeline/*/__pycache__ +service-account.json \ No newline at end of file diff --git a/pipeline/.env b/pipeline/.env new file mode 100644 index 000000000..edd9cc75e --- /dev/null +++ b/pipeline/.env @@ -0,0 +1 @@ +AIRFLOW_UID=501 diff --git a/pipeline/Dockerfile b/pipeline/Dockerfile new file mode 100644 index 000000000..6e97b0191 --- /dev/null +++ b/pipeline/Dockerfile @@ -0,0 +1,12 @@ +FROM apache/airflow:2.4.2 + +USER root + +RUN apt update && apt install -y software-properties-common && add-apt-repository ppa:ubuntugis/ppa && apt install -y libgdal-dev gdal-bin && sudo add-apt-repository -r ppa:ubuntugis/ppa +RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - && apt install -y nodejs && rm -rf /var/lib/apt/lists/* +RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev + +USER airflow + +RUN sudo npm install -g canvas + diff --git a/pipeline/dags/cloud.cfc.py b/pipeline/dags/cloud.cfc.py new file mode 100644 index 000000000..bc27deb4a --- /dev/null +++ b/pipeline/dags/cloud.cfc.py @@ -0,0 +1,177 @@ +import json +from pathlib import Path +from datetime import datetime, timedelta +from airflow import DAG, XComArg +from airflow.models.param import Param +from airflow.operators.bash import BashOperator +from airflow.decorators import task, task_group +import shared + +# layer +LAYER_ID = 'cloud' +LAYER_VARIABLE = 'cfc' +LAYER_VERSION = '1.2.3' +LAYER_TYPE = "tiles" +MIN_VALUE = 0 +MAX_VALUE = 10 +MAX_ZOOM = 7 +UNIT = 'mg/m2' +BASEMAP = 'blue' +TIME_FORMAT = { + "year": "numeric", + "month": "long" +} +LEGEND_VALUES = ["100 %", "0"] + +# dev +NUM_FILES = 10 +BUCKET_ORIGIN = 'esa-cfs-cate-data' +BUCKET_TMP = 'esa-cfs-pipeline-tmp' +BUCKET_OUTPUT = 'esa-cfs-pipeline-output' +WORKDIR = '/tmp/files' +COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt' + + +with DAG(dag_id="cloud.cfc", start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params={ + "max_files": Param(2, type="integer", minimum=0), + }) as dag: + + mk_workdir = BashOperator( + task_id='mk_workdir', + bash_command=f'rm -rf {WORKDIR} && mkdir -p {WORKDIR}' + ) + + + + @task() + def list_files(**context): + return shared.gcs_list(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, max_files=context["params"]["max_files"])() + + @task(task_id="download") + def download(filename: str): + return shared.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')(filename) + + gdal_info = BashOperator( + task_id='gdal_info', + bash_command='gdalinfo $FILEPATH_IN', + env={"FILEPATH_IN": "{{ task_instance.xcom_pull(task_ids='download', key='return_value')[0] }}"} + ) + + + + @task() + def upload(filename: str): + return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename) + + @task() + def upload_metadata(filename: str): + return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename) + + + + @task() + def upload_debug(filename: str): + return shared.gcs_upload_file(bucket_name=BUCKET_TMP, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename) + + @task() + def create_metadata_json(file_list: str): + metadata = { + "id": f'{LAYER_ID}.{LAYER_VARIABLE}', + "minValue": MIN_VALUE, + "maxValue": MAX_VALUE, + "type": LAYER_TYPE, + "zoomLevels": MAX_ZOOM, + "timestamps": list(map(shared.filename_to_date, file_list)), + "unit": UNIT, + "basemap": BASEMAP, + "timeFormat": TIME_FORMAT, + "legendValues": LEGEND_VALUES + } + + filepath = str(Path(WORKDIR).joinpath('metadata.json')) + + with open(filepath, "w") as f: + f.write(json.dumps(metadata, indent=4)) + print(json.dumps(metadata, indent=4)) + + return filepath + + + # # connect tasks + files = list_files() + + mk_workdir >> files + + @task_group() + def group_metadata(): + upload_metadata(create_metadata_json(files)) + + + @task_group() + def group_legend_image(): + @task() + def upload_legend(filename: str): + return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename) + + legend_image = BashOperator( + task_id='create_legend_png', + bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT', + env={"FILEPATH_OUT": f'{WORKDIR}/legend.png', "COLOR_FILE": COLOR_FILE} + ) + legend_image >> upload_legend(legend_image.output) + + group_metadata() + mk_workdir >> group_legend_image() + + + downloads = download.expand(filename=files) + downloads >> gdal_info + + @task_group() + def group_gdal(downloads): + gdal_warp = BashOperator.partial( + task_id='gdal_warp', + bash_command='rm -f $FILEPATH_OUT && gdalwarp -t_srs EPSG:4326 -te -180 -90 180 90 -ts 1024 512 -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT && echo $FILEPATH_OUT' + ) + + gdal_dem = BashOperator.partial( + task_id='gdal_dem', + bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT' + ) + + gdal_translate = BashOperator.partial( + task_id='gdal_translate', + bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT' + ) + + gdal_warps = gdal_warp.expand( + env=downloads.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": shared.change_filename(filename, appendix='warped', suffix='tiff'), + "DATA_VARIABLE": LAYER_VARIABLE + }) + ) + + gdal_dems = gdal_dem.expand( + env=gdal_warps.output.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": shared.change_filename(filename, appendix='colored'), + "COLOR_FILE": COLOR_FILE + }) + ) + + gdal_translates = gdal_translate.expand( + env=gdal_dems.output.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": shared.change_filename(filename, appendix='', suffix='png'), + }) + ) + + return gdal_translates + + + + # upload_debugs = upload_debug.expand(filename=gdal_dems.output) + + uploads = upload.expand(filename=XComArg(group_gdal(downloads))) + diff --git a/pipeline/dags/shared.py b/pipeline/dags/shared.py new file mode 100644 index 000000000..688d88407 --- /dev/null +++ b/pipeline/dags/shared.py @@ -0,0 +1,47 @@ +from pathlib import Path +from datetime import datetime, timedelta +from airflow.providers.google.cloud.hooks.gcs import GCSHook + + +def only_filename(filepath: str): + return Path(filepath).name + +def change_filename(filepath: str, appendix: str = '', suffix: str = ''): + if appendix and not appendix.startswith('_'): appendix = '_' + appendix + if suffix and not suffix.startswith('.'): suffix = '.' + suffix + + p = Path(filepath) + name = p.name.replace(p.suffix, '').split('_')[0] + new_suffix = suffix or p.suffix + return p.with_name(name + appendix + new_suffix) + + +def gcs_download_file(bucket_name: str, dir: str, appendix: str = ''): + def fn(filename: str): + hook = GCSHook('google') + local_filename = dir + '/' + only_filename(change_filename(filename, appendix)) + hook.download(bucket_name, filename, local_filename) + return local_filename + return fn + +def gcs_upload_file(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str): + def fn(local_filename: str): + hook = GCSHook('google') + remote_filename = f'{layer_id}.{layer_variable}/{layer_version}/{only_filename(local_filename)}' + hook.upload(bucket_name, remote_filename, local_filename) + return local_filename + return fn + +def gcs_list(bucket_name: str, layer_id: str, layer_variable: str, max_files: int = 0): + def list_files(): + hook = GCSHook('google') + filenames = hook.list(bucket_name, prefix=f'{layer_id}.{layer_variable}', delimiter='.nc') + filenames = [f for f in filenames if f.endswith('.nc')] + if (max_files > 0): filenames = filenames[:max_files] + filenames.sort() + return filenames + return list_files + +def filename_to_date(filename: str): + date_string = filename.split('/')[-1].replace('.nc', '') + return datetime.strptime(date_string, "%Y%m%d").isoformat() + 'Z' \ No newline at end of file diff --git a/pipeline/docker-compose.yaml b/pipeline/docker-compose.yaml new file mode 100644 index 000000000..89adce0d8 --- /dev/null +++ b/pipeline/docker-compose.yaml @@ -0,0 +1,278 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.4.2 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +version: '3' +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.2} + build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'false' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth' + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + volumes: + - ./dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 5s + retries: 5 + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 30s + retries: 50 + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - 8080:8080 + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"'] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 10s + timeout: 10s + retries: 5 + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - .:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - 5555:5555 + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: diff --git a/pipeline/plugins/colors/cloud.cfc.txt b/pipeline/plugins/colors/cloud.cfc.txt new file mode 100644 index 000000000..abd110a42 --- /dev/null +++ b/pipeline/plugins/colors/cloud.cfc.txt @@ -0,0 +1,3 @@ +1 255 255 255 255 +0 255 255 255 0 +nv 0 0 0 0 diff --git a/pipeline/plugins/generate-legend-image.js b/pipeline/plugins/generate-legend-image.js new file mode 100644 index 000000000..d1e91f8bc --- /dev/null +++ b/pipeline/plugins/generate-legend-image.js @@ -0,0 +1,69 @@ +#!/usr/bin/env bash + +// Create legend images from gdal color files + +const fs = require('fs'); +const {createCanvas} = require('/usr/lib/node_modules/canvas'); + +const COLOR_FILE = process.env.COLOR_FILE +const FILEPATH_OUT = process.env.FILEPATH_OUT +const WIDTH = 1; +const HEIGHT = 500; + + +const canvas = createCanvas(WIDTH, HEIGHT); +const ctx = canvas.getContext('2d'); +const colorRamp = readColorFile(COLOR_FILE); +const gradient = ctx.createLinearGradient(0, 0, 0, HEIGHT); +const max = colorRamp[0][0]; +const min = colorRamp[colorRamp.length - 1][0]; +const range = max - min; + +let lastColor = null; + +colorRamp.forEach(colorStop => { + const [value, r, g, b, a] = colorStop; + const stop = 1 - (value - min) / range; + + const alpha = Number(a) / 255; + const hasAlpha = !isNaN(a); + const color = `rgb${hasAlpha ? 'a' : ''}(${r}, ${g}, ${b}${ + hasAlpha ? `, ${alpha}` : '' + })`; + + if (COLOR_FILE.includes('lccs_class') && lastColor) { + gradient.addColorStop(stop - 0.001, lastColor); + } + + gradient.addColorStop(stop, color); + + lastColor = color; +}); + +ctx.fillStyle = gradient; +ctx.fillRect(0, 0, WIDTH, HEIGHT); + +writeImage(FILEPATH_OUT, canvas); + +function readColorFile(file) { + const content = fs.readFileSync(file, 'utf8'); + const stops = content + .split('\n') + .filter(Boolean) + .filter(line => !line.startsWith('nv')) + .map(line => line.split(' ')); + + if (file.includes('lswt')) { + // do not include the "ice" color + return stops.filter(([v]) => v > -1000); + } + + return stops; +} + +function writeImage(file, canvas) { + const out = fs.createWriteStream(file); + const stream = canvas.createPNGStream(); + stream.pipe(out); + out.on('finish', () => console.log(`Image was written to ${FILEPATH_OUT}`)); +}