Skip to content

Commit

Permalink
feat(pipeline): vegetation lai
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Helms authored and Andreas Helms committed Feb 28, 2024
1 parent c3103a1 commit cbcaa84
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pipeline/dags/task_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def fn(filename: str):
def gcloud_upload_dir(layer_id: str, layer_variable: str, directory: str):
return BashOperator(
task_id='gcloud_upload',
bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -q -m cp -r $UPLOAD_DIR/* $BUCKET',
bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -m -h "Cache-Control:no-cache" rsync -d -r $UPLOAD_DIR $BUCKET',
env={
"UPLOAD_DIR": directory,
"BUCKET": 'gs://{{ dag_run.conf["output_bucket"] }}/{{ dag_run.conf["layer_version"] }}/' + f'{layer_id}.{layer_variable}/',
"BUCKET": 'gs://{{ dag_run.conf["output_bucket"] }}/{{ dag_run.conf["layer_version"] }}/' + f'{layer_id}.{layer_variable}',
"KEY_FILE": '/opt/airflow/plugins/service-account.json',
"CLOUDSDK_PYTHON": '/usr/local/bin/python'
}
Expand Down Expand Up @@ -155,7 +155,7 @@ def fn(files, **context):
return fn


def gdal_transforms(layer_variable: str, color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4):
def gdal_transforms(color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', layer_variable: str = '', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4):
def get_transform_task():
if layer_type == 'image':
return BashOperator.partial(
Expand All @@ -178,7 +178,8 @@ def get_transform_outpath(filename):

@task_group(group_id='gdal_transforms_group')
def fn(downloads):
warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' if not warp_cmd else warp_cmd
file_path_in = 'NETCDF:"$FILEPATH_IN":$DATA_VARIABLE' if layer_variable else '$FILEPATH_IN'
warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW {file_path_in} $FILEPATH_OUT' if not warp_cmd else warp_cmd
gdal_warp = BashOperator.partial(
task_id='reproject_and_to_tiff',
bash_command=f'rm -f $FILEPATH_OUT && {warp_command} && echo $FILEPATH_OUT',
Expand Down
69 changes: 69 additions & 0 deletions pipeline/dags/vegetation.lai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import datetime
import task_factories
from airflow import DAG
from airflow.models.param import Param
from helper import get_default_layer_version

# layer
LAYER_ID = 'vegetation'
LAYER_VARIABLE = 'lai'
RESOLUTION = '2240 12320'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 8,
"type": "tiles", # 'tiles' or 'image'
"zoom_levels": '0-7',
"units": '',
"basemap": None,
"legend_values": ["8 m²/m²", "0"],
"time_format": {
"year": "numeric",
"month": "long",
"day": "numeric"
}
}

# dev
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG = False

default_layer_version = get_default_layer_version()
dag_params = {
"max_files": Param(2, type=["null", "integer"], minimum=0,),
"output_bucket": Param("esa-cfs-pipeline-output", type=["string"], enum=['esa-cfs-pipeline-output', 'esa-cfs-tiles']),
"skip_downloads": Param(False, type="boolean"),
"layer_version": Param(default_layer_version, type="string")
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_dir_skippable(
task_id='clean_workdir', dir=WORKDIR)()
list_files = task_factories.gcs_list_files(
bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(
bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
legend_image = task_factories.legend_image(
workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(
color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION)
upload = task_factories.upload(
WORKDIR, LAYER_ID, LAYER_VARIABLE, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()
6 changes: 6 additions & 0 deletions pipeline/plugins/colors/vegetation.lai.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
7.8 0 104 55 255
4.28 106 191 114 255
2.49 173 221 144 255
0.98 224 242 178 255
0 255 255 204 255
nv 0 0 0 0
Binary file added pipeline/plugins/layer-icons/vegetation.lai.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit cbcaa84

Please sign in to comment.