Skip to content

Commit

Permalink
feat(pipeline): introducing airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
pwambach authored and andreashelms committed Feb 5, 2024
1 parent 6879547 commit 8eb3635
Show file tree
Hide file tree
Showing 8 changed files with 591 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@
*.ipynb
.DS_Store
assets/legend-images/

pipeline/logs
pipeline/*/__pycache__
service-account.json
1 change: 1 addition & 0 deletions pipeline/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AIRFLOW_UID=501
12 changes: 12 additions & 0 deletions pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM apache/airflow:2.4.2

USER root

RUN apt update && apt install -y software-properties-common && add-apt-repository ppa:ubuntugis/ppa && apt install -y libgdal-dev gdal-bin && sudo add-apt-repository -r ppa:ubuntugis/ppa
RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - && apt install -y nodejs && rm -rf /var/lib/apt/lists/*
RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev

USER airflow

RUN sudo npm install -g canvas

177 changes: 177 additions & 0 deletions pipeline/dags/cloud.cfc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import json
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG, XComArg
from airflow.models.param import Param
from airflow.operators.bash import BashOperator
from airflow.decorators import task, task_group
import shared

# layer
LAYER_ID = 'cloud'
LAYER_VARIABLE = 'cfc'
LAYER_VERSION = '1.2.3'
LAYER_TYPE = "tiles"
MIN_VALUE = 0
MAX_VALUE = 10
MAX_ZOOM = 7
UNIT = 'mg/m2'
BASEMAP = 'blue'
TIME_FORMAT = {
"year": "numeric",
"month": "long"
}
LEGEND_VALUES = ["100 %", "0"]

# dev
NUM_FILES = 10
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/tmp/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'


with DAG(dag_id="cloud.cfc", start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params={
"max_files": Param(2, type="integer", minimum=0),
}) as dag:

mk_workdir = BashOperator(
task_id='mk_workdir',
bash_command=f'rm -rf {WORKDIR} && mkdir -p {WORKDIR}'
)



@task()
def list_files(**context):
return shared.gcs_list(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, max_files=context["params"]["max_files"])()

@task(task_id="download")
def download(filename: str):
return shared.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')(filename)

gdal_info = BashOperator(
task_id='gdal_info',
bash_command='gdalinfo $FILEPATH_IN',
env={"FILEPATH_IN": "{{ task_instance.xcom_pull(task_ids='download', key='return_value')[0] }}"}
)



@task()
def upload(filename: str):
return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename)

@task()
def upload_metadata(filename: str):
return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename)



@task()
def upload_debug(filename: str):
return shared.gcs_upload_file(bucket_name=BUCKET_TMP, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename)

@task()
def create_metadata_json(file_list: str):
metadata = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"minValue": MIN_VALUE,
"maxValue": MAX_VALUE,
"type": LAYER_TYPE,
"zoomLevels": MAX_ZOOM,
"timestamps": list(map(shared.filename_to_date, file_list)),
"unit": UNIT,
"basemap": BASEMAP,
"timeFormat": TIME_FORMAT,
"legendValues": LEGEND_VALUES
}

filepath = str(Path(WORKDIR).joinpath('metadata.json'))

with open(filepath, "w") as f:
f.write(json.dumps(metadata, indent=4))
print(json.dumps(metadata, indent=4))

return filepath


# # connect tasks
files = list_files()

mk_workdir >> files

@task_group()
def group_metadata():
upload_metadata(create_metadata_json(files))


@task_group()
def group_legend_image():
@task()
def upload_legend(filename: str):
return shared.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)(filename)

legend_image = BashOperator(
task_id='create_legend_png',
bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT',
env={"FILEPATH_OUT": f'{WORKDIR}/legend.png', "COLOR_FILE": COLOR_FILE}
)
legend_image >> upload_legend(legend_image.output)

group_metadata()
mk_workdir >> group_legend_image()


downloads = download.expand(filename=files)
downloads >> gdal_info

@task_group()
def group_gdal(downloads):
gdal_warp = BashOperator.partial(
task_id='gdal_warp',
bash_command='rm -f $FILEPATH_OUT && gdalwarp -t_srs EPSG:4326 -te -180 -90 180 90 -ts 1024 512 -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT && echo $FILEPATH_OUT'
)

gdal_dem = BashOperator.partial(
task_id='gdal_dem',
bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT'
)

gdal_translate = BashOperator.partial(
task_id='gdal_translate',
bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT'
)

gdal_warps = gdal_warp.expand(
env=downloads.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": shared.change_filename(filename, appendix='warped', suffix='tiff'),
"DATA_VARIABLE": LAYER_VARIABLE
})
)

gdal_dems = gdal_dem.expand(
env=gdal_warps.output.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": shared.change_filename(filename, appendix='colored'),
"COLOR_FILE": COLOR_FILE
})
)

gdal_translates = gdal_translate.expand(
env=gdal_dems.output.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": shared.change_filename(filename, appendix='', suffix='png'),
})
)

return gdal_translates



# upload_debugs = upload_debug.expand(filename=gdal_dems.output)

uploads = upload.expand(filename=XComArg(group_gdal(downloads)))

47 changes: 47 additions & 0 deletions pipeline/dags/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pathlib import Path
from datetime import datetime, timedelta
from airflow.providers.google.cloud.hooks.gcs import GCSHook


def only_filename(filepath: str):
return Path(filepath).name

def change_filename(filepath: str, appendix: str = '', suffix: str = ''):
if appendix and not appendix.startswith('_'): appendix = '_' + appendix
if suffix and not suffix.startswith('.'): suffix = '.' + suffix

p = Path(filepath)
name = p.name.replace(p.suffix, '').split('_')[0]
new_suffix = suffix or p.suffix
return p.with_name(name + appendix + new_suffix)


def gcs_download_file(bucket_name: str, dir: str, appendix: str = ''):
def fn(filename: str):
hook = GCSHook('google')
local_filename = dir + '/' + only_filename(change_filename(filename, appendix))
hook.download(bucket_name, filename, local_filename)
return local_filename
return fn

def gcs_upload_file(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str):
def fn(local_filename: str):
hook = GCSHook('google')
remote_filename = f'{layer_id}.{layer_variable}/{layer_version}/{only_filename(local_filename)}'
hook.upload(bucket_name, remote_filename, local_filename)
return local_filename
return fn

def gcs_list(bucket_name: str, layer_id: str, layer_variable: str, max_files: int = 0):
def list_files():
hook = GCSHook('google')
filenames = hook.list(bucket_name, prefix=f'{layer_id}.{layer_variable}', delimiter='.nc')
filenames = [f for f in filenames if f.endswith('.nc')]
if (max_files > 0): filenames = filenames[:max_files]
filenames.sort()
return filenames
return list_files

def filename_to_date(filename: str):
date_string = filename.split('/')[-1].replace('.nc', '')
return datetime.strptime(date_string, "%Y%m%d").isoformat() + 'Z'
Loading

0 comments on commit 8eb3635

Please sign in to comment.