Skip to content

Commit

Permalink
feat(pipeline): permafrost
Browse files Browse the repository at this point in the history
  • Loading branch information
pwambach authored and andreashelms committed Feb 5, 2024
1 parent e04ab10 commit 46d2d8f
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 48 deletions.
8 changes: 6 additions & 2 deletions pipeline/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ FROM apache/airflow:2.4.2

USER root

# gdal
RUN apt update && apt install -y software-properties-common && add-apt-repository ppa:ubuntugis/ppa && apt install -y libgdal-dev gdal-bin && sudo add-apt-repository -r ppa:ubuntugis/ppa
# gcloud
RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && apt-get update -y && apt-get install google-cloud-cli -y
# nodejs + node-canvas module
RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - && apt install -y nodejs && rm -rf /var/lib/apt/lists/*
RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev
RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev
RUN sudo npm install -g canvas

USER airflow

RUN sudo npm install -g canvas

25 changes: 14 additions & 11 deletions pipeline/dags/cloud.cfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
LAYER_ID = 'cloud'
LAYER_VARIABLE = 'cfc'
LAYER_VERSION = '1.2.3'
RESOLUTION = '1024 512'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 10,
"type": "tiles",
"zoom_levels": 7,
"type": "image", # 'tiles' or 'image'
"zoom_levels": '0-2',
"units": 'mg/m2',
"basemap": 'blue',
"legend_values": ["100 %", "0"],
Expand All @@ -24,12 +25,12 @@
}

# dev
NUM_FILES = 10
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/tmp/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

dag_params = {
"max_files": Param(2, type="integer", minimum=0)
Expand All @@ -41,19 +42,21 @@
clean_workdir = task_factories.clean_workdir(workdir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
gdal_info = task_factories.gdal_info()
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(LAYER_VARIABLE, COLOR_FILE)
upload = task_factories.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION)
upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR)

# connect tasks
files = list_files()
clean_workdir >> files
clean_workdir >> legend_image()
metadata(files)
downloads = download.expand(filename=files)
downloads >> gdal_info
final_images = gdal_transforms(downloads)
upload.expand(filename=final_images.output)
gdal_transforms(downloads) >> upload
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()



5 changes: 3 additions & 2 deletions pipeline/dags/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
def only_filename(filepath: str):
return Path(filepath).name

def change_filename(filepath: str, appendix: str = '', suffix: str = ''):
def change_filename(filepath: str, appendix: str = '', suffix: str = '', remove_suffix = False):
if appendix and not appendix.startswith('_'): appendix = '_' + appendix
if suffix and not suffix.startswith('.'): suffix = '.' + suffix

p = Path(filepath)
name = p.name.replace(p.suffix, '').split('_')[0]
new_suffix = suffix or p.suffix
new_suffix = suffix or p.suffix if not remove_suffix else ''

return p.with_name(name + appendix + new_suffix)

def filename_to_date(filename: str):
Expand Down
62 changes: 62 additions & 0 deletions pipeline/dags/permafrost.pfr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime, timedelta
import task_factories
from airflow import DAG
from airflow.models.param import Param

# layer
LAYER_ID = 'permafrost'
LAYER_VARIABLE = 'pfr'
LAYER_VERSION = '1.2.3'
RESOLUTION = '36366 18182'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 10,
"type": "tiles", # 'tiles' or 'image'
"zoom_levels": '0-2',
"units": 'xxx',
"basemap": 'blue',
"legend_values": ["100 %", "0"],
"time_format": {
"year": "numeric",
"month": "long"
}
}

# dev
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/tmp/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

dag_params = {
"max_files": Param(1, type="integer", minimum=0)
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_workdir(workdir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR)

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
gdal_transforms(downloads) >> upload
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()



87 changes: 54 additions & 33 deletions pipeline/dags/task_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def fn(filename: str):
return local_filename
return fn

def gcs_upload_file(bucket_name=str, layer_id=str, layer_variable=str, layer_version=str):
def gcs_upload_file(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str):
@task(task_id='gcs_upload_file')
def fn(filename: str):
hook = GCSHook('google')
Expand All @@ -41,6 +41,18 @@ def fn(filename: str):
return filename
return fn

def gcloud_upload_dir(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, directory: str):
return BashOperator(
task_id='gcloud_upload',
bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -m cp -r $UPLOAD_DIR/* $BUCKET',
env={
"UPLOAD_DIR": directory,
"BUCKET": f'gs://{bucket_name}/{layer_id}.{layer_variable}/{layer_version}/final/',
"KEY_FILE": '/opt/airflow/plugins/service-account.json',
"CLOUDSDK_PYTHON": '/usr/local/bin/python'
}
)

def gdal_info():
return BashOperator(
task_id='gdal_info',
Expand All @@ -49,56 +61,65 @@ def gdal_info():
)

def legend_image(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, color_file: str):
@task_group(group_id='legend_image_group')
def fn():
upload_legend = gcs_upload_file(bucket_name=bucket_name, layer_id=layer_id, layer_variable=layer_variable, layer_version=layer_version)
legend_image = BashOperator(
task_id='legend_image',
bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT',
env={"FILEPATH_OUT": f'{workdir}/legend.png', "COLOR_FILE": color_file}
)
legend_image >> upload_legend(legend_image.output)
return fn
return BashOperator(
task_id='legend_image',
bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT',
env={"FILEPATH_OUT": f'{workdir}/legend.png', "COLOR_FILE": color_file}
)

# generate timestamps array from filenames and write metadata to json file then upload to gcs
def metadata(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, metadata: dict):
@task_group(group_id='metadata_group')
@task(task_id='metadata')
def fn(files):
@task()
def write_metadata_json(files):
timestamps = str(files)
extended_metadata = dict({"timestamps": timestamps}, **metadata)
filepath = str(Path(workdir).joinpath('metadata.json'))
timestamps = list(map(helper.filename_to_date, files))
extended_metadata = dict(metadata, **{"timestamps": timestamps})
filepath = str(Path(workdir).joinpath('metadata.json'))

with open(filepath, "w") as f:
metadata_string = json.dumps(extended_metadata, indent=4)
f.write(metadata_string)
print(metadata_string)

with open(filepath, "w") as f:
metadata_string = json.dumps(extended_metadata, indent=4)
f.write(metadata_string)
print(metadata_string)
return filepath
return fn

return filepath
def gdal_transforms(layer_variable: str, color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4):
def get_transform_task():
if layer_type == 'image':
return BashOperator.partial(
task_id='gdal_translate_image',
bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT',
max_active_tis_per_dag=max_tis_translate
)

return BashOperator.partial(
task_id='gdal_translate_tiles',
bash_command=f'rm -rf $FILEPATH_OUT && gdal2tiles.py --profile geodetic --zoom={zoom_levels} --tmscompatible --no-kml --webviewer=none --resampling average --s_srs EPSG:4326 $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT' ,
max_active_tis_per_dag=max_tis_translate
)

upload_metadata = gcs_upload_file(bucket_name=bucket_name, layer_id=layer_id, layer_variable=layer_variable, layer_version=layer_version)
def get_transform_outpath(filename):
if layer_type == 'image':
return helper.change_filename(filename, appendix='', suffix='png')

return upload_metadata(write_metadata_json(files))
return fn
return helper.change_filename(filename, appendix='', remove_suffix=True)

def gdal_transforms(layer_variable: str, color_file: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512'):
@task_group(group_id='gdal_transforms_group')
def fn(downloads):
warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' if not warp_cmd else warp_cmd
gdal_warp = BashOperator.partial(
task_id='gdal_warp',
bash_command=f'rm -f $FILEPATH_OUT && gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT && echo $FILEPATH_OUT'
bash_command=f'rm -f $FILEPATH_OUT && {warp_command} && echo $FILEPATH_OUT',
max_active_tis_per_dag=max_tis_warp
)

gdal_dem = BashOperator.partial(
task_id='gdal_dem',
bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT'
bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT',
max_active_tis_per_dag=max_tis_dem
)

gdal_translate = BashOperator.partial(
task_id='gdal_translate',
bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT'
)
gdal_translate = get_transform_task()

gdal_warps = gdal_warp.expand(
env=downloads.map(lambda filename: {
Expand All @@ -119,7 +140,7 @@ def fn(downloads):
gdal_translates = gdal_translate.expand(
env=gdal_dems.output.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": helper.change_filename(filename, appendix='', suffix='png'),
"FILEPATH_OUT": get_transform_outpath(filename)
})
)

Expand Down
7 changes: 7 additions & 0 deletions pipeline/plugins/colors/permafrost.PFR.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
100 37 87 172 255
80 37 87 172 204
60 37 87 172 153
40 37 87 172 102
20 37 87 172 51
0 37 87 172 0
nv 0 0 0 0

0 comments on commit 46d2d8f

Please sign in to comment.