diff --git a/pipeline/Dockerfile b/pipeline/Dockerfile index 6e97b0191..7d7bdc6fb 100644 --- a/pipeline/Dockerfile +++ b/pipeline/Dockerfile @@ -2,11 +2,15 @@ FROM apache/airflow:2.4.2 USER root +# gdal RUN apt update && apt install -y software-properties-common && add-apt-repository ppa:ubuntugis/ppa && apt install -y libgdal-dev gdal-bin && sudo add-apt-repository -r ppa:ubuntugis/ppa +# gcloud +RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && apt-get update -y && apt-get install google-cloud-cli -y +# nodejs + node-canvas module RUN curl -fsSL https://deb.nodesource.com/setup_16.x | bash - && apt install -y nodejs && rm -rf /var/lib/apt/lists/* -RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev +RUN apt update && sudo apt install -y build-essential libcairo2-dev libpango1.0-dev libjpeg-dev libgif-dev librsvg2-dev +RUN sudo npm install -g canvas USER airflow -RUN sudo npm install -g canvas diff --git a/pipeline/dags/cloud.cfc.py b/pipeline/dags/cloud.cfc.py index 373438b9b..77d98af4f 100644 --- a/pipeline/dags/cloud.cfc.py +++ b/pipeline/dags/cloud.cfc.py @@ -7,13 +7,14 @@ LAYER_ID = 'cloud' LAYER_VARIABLE = 'cfc' LAYER_VERSION = '1.2.3' +RESOLUTION = '1024 512' METADATA = { "id": f'{LAYER_ID}.{LAYER_VARIABLE}', "timestamps": [], # will be injected "min_value": 0, "max_value": 10, - "type": "tiles", - "zoom_levels": 7, + "type": "image", # 'tiles' or 'image' + "zoom_levels": '0-2', "units": 'mg/m2', "basemap": 'blue', "legend_values": ["100 %", "0"], @@ -24,12 +25,12 @@ } # dev -NUM_FILES = 10 BUCKET_ORIGIN = 'esa-cfs-cate-data' BUCKET_TMP = 'esa-cfs-pipeline-tmp' BUCKET_OUTPUT = 'esa-cfs-pipeline-output' WORKDIR = '/tmp/files' COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt' +DEBUG=False dag_params = { "max_files": Param(2, type="integer", minimum=0) @@ -41,19 +42,21 @@ clean_workdir = task_factories.clean_workdir(workdir=WORKDIR) list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded') - gdal_info = task_factories.gdal_info() legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE) metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA) - gdal_transforms = task_factories.gdal_transforms(LAYER_VARIABLE, COLOR_FILE) - upload = task_factories.gcs_upload_file(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION) + gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION) + upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR) # connect tasks files = list_files() clean_workdir >> files - clean_workdir >> legend_image() - metadata(files) downloads = download.expand(filename=files) - downloads >> gdal_info - final_images = gdal_transforms(downloads) - upload.expand(filename=final_images.output) + gdal_transforms(downloads) >> upload + clean_workdir >> legend_image + metadata(files) + + if DEBUG: + downloads >> task_factories.gdal_info() + + diff --git a/pipeline/dags/helper.py b/pipeline/dags/helper.py index b92b69990..4782a7bb0 100644 --- a/pipeline/dags/helper.py +++ b/pipeline/dags/helper.py @@ -4,13 +4,14 @@ def only_filename(filepath: str): return Path(filepath).name -def change_filename(filepath: str, appendix: str = '', suffix: str = ''): +def change_filename(filepath: str, appendix: str = '', suffix: str = '', remove_suffix = False): if appendix and not appendix.startswith('_'): appendix = '_' + appendix if suffix and not suffix.startswith('.'): suffix = '.' + suffix p = Path(filepath) name = p.name.replace(p.suffix, '').split('_')[0] - new_suffix = suffix or p.suffix + new_suffix = suffix or p.suffix if not remove_suffix else '' + return p.with_name(name + appendix + new_suffix) def filename_to_date(filename: str): diff --git a/pipeline/dags/permafrost.pfr.py b/pipeline/dags/permafrost.pfr.py new file mode 100644 index 000000000..ca53c52c5 --- /dev/null +++ b/pipeline/dags/permafrost.pfr.py @@ -0,0 +1,62 @@ +from datetime import datetime, timedelta +import task_factories +from airflow import DAG +from airflow.models.param import Param + +# layer +LAYER_ID = 'permafrost' +LAYER_VARIABLE = 'pfr' +LAYER_VERSION = '1.2.3' +RESOLUTION = '36366 18182' +METADATA = { + "id": f'{LAYER_ID}.{LAYER_VARIABLE}', + "timestamps": [], # will be injected + "min_value": 0, + "max_value": 10, + "type": "tiles", # 'tiles' or 'image' + "zoom_levels": '0-2', + "units": 'xxx', + "basemap": 'blue', + "legend_values": ["100 %", "0"], + "time_format": { + "year": "numeric", + "month": "long" + } +} + +# dev +BUCKET_ORIGIN = 'esa-cfs-cate-data' +BUCKET_TMP = 'esa-cfs-pipeline-tmp' +BUCKET_OUTPUT = 'esa-cfs-pipeline-output' +WORKDIR = '/tmp/files' +COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt' +DEBUG=False + +dag_params = { + "max_files": Param(1, type="integer", minimum=0) +} + +with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag: + + # create tasks + clean_workdir = task_factories.clean_workdir(workdir=WORKDIR) + list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) + download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded') + legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE) + metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA) + gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1) + upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR) + + # connect tasks + files = list_files() + clean_workdir >> files + downloads = download.expand(filename=files) + gdal_transforms(downloads) >> upload + clean_workdir >> legend_image + metadata(files) + + if DEBUG: + downloads >> task_factories.gdal_info() + + + diff --git a/pipeline/dags/task_factories.py b/pipeline/dags/task_factories.py index b0bad4647..5ffba1700 100644 --- a/pipeline/dags/task_factories.py +++ b/pipeline/dags/task_factories.py @@ -32,7 +32,7 @@ def fn(filename: str): return local_filename return fn -def gcs_upload_file(bucket_name=str, layer_id=str, layer_variable=str, layer_version=str): +def gcs_upload_file(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str): @task(task_id='gcs_upload_file') def fn(filename: str): hook = GCSHook('google') @@ -41,6 +41,18 @@ def fn(filename: str): return filename return fn +def gcloud_upload_dir(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, directory: str): + return BashOperator( + task_id='gcloud_upload', + bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -m cp -r $UPLOAD_DIR/* $BUCKET', + env={ + "UPLOAD_DIR": directory, + "BUCKET": f'gs://{bucket_name}/{layer_id}.{layer_variable}/{layer_version}/final/', + "KEY_FILE": '/opt/airflow/plugins/service-account.json', + "CLOUDSDK_PYTHON": '/usr/local/bin/python' + } + ) + def gdal_info(): return BashOperator( task_id='gdal_info', @@ -49,56 +61,65 @@ def gdal_info(): ) def legend_image(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, color_file: str): - @task_group(group_id='legend_image_group') - def fn(): - upload_legend = gcs_upload_file(bucket_name=bucket_name, layer_id=layer_id, layer_variable=layer_variable, layer_version=layer_version) - legend_image = BashOperator( - task_id='legend_image', - bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT', - env={"FILEPATH_OUT": f'{workdir}/legend.png', "COLOR_FILE": color_file} - ) - legend_image >> upload_legend(legend_image.output) - return fn + return BashOperator( + task_id='legend_image', + bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT', + env={"FILEPATH_OUT": f'{workdir}/legend.png', "COLOR_FILE": color_file} + ) # generate timestamps array from filenames and write metadata to json file then upload to gcs def metadata(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, metadata: dict): - @task_group(group_id='metadata_group') + @task(task_id='metadata') def fn(files): - @task() - def write_metadata_json(files): - timestamps = str(files) - extended_metadata = dict({"timestamps": timestamps}, **metadata) - filepath = str(Path(workdir).joinpath('metadata.json')) + timestamps = list(map(helper.filename_to_date, files)) + extended_metadata = dict(metadata, **{"timestamps": timestamps}) + filepath = str(Path(workdir).joinpath('metadata.json')) + + with open(filepath, "w") as f: + metadata_string = json.dumps(extended_metadata, indent=4) + f.write(metadata_string) + print(metadata_string) - with open(filepath, "w") as f: - metadata_string = json.dumps(extended_metadata, indent=4) - f.write(metadata_string) - print(metadata_string) + return filepath + return fn - return filepath +def gdal_transforms(layer_variable: str, color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4): + def get_transform_task(): + if layer_type == 'image': + return BashOperator.partial( + task_id='gdal_translate_image', + bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT', + max_active_tis_per_dag=max_tis_translate + ) + + return BashOperator.partial( + task_id='gdal_translate_tiles', + bash_command=f'rm -rf $FILEPATH_OUT && gdal2tiles.py --profile geodetic --zoom={zoom_levels} --tmscompatible --no-kml --webviewer=none --resampling average --s_srs EPSG:4326 $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT' , + max_active_tis_per_dag=max_tis_translate + ) - upload_metadata = gcs_upload_file(bucket_name=bucket_name, layer_id=layer_id, layer_variable=layer_variable, layer_version=layer_version) + def get_transform_outpath(filename): + if layer_type == 'image': + return helper.change_filename(filename, appendix='', suffix='png') - return upload_metadata(write_metadata_json(files)) - return fn + return helper.change_filename(filename, appendix='', remove_suffix=True) -def gdal_transforms(layer_variable: str, color_file: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512'): @task_group(group_id='gdal_transforms_group') def fn(downloads): + warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' if not warp_cmd else warp_cmd gdal_warp = BashOperator.partial( task_id='gdal_warp', - bash_command=f'rm -f $FILEPATH_OUT && gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT && echo $FILEPATH_OUT' + bash_command=f'rm -f $FILEPATH_OUT && {warp_command} && echo $FILEPATH_OUT', + max_active_tis_per_dag=max_tis_warp ) gdal_dem = BashOperator.partial( task_id='gdal_dem', - bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT' + bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT', + max_active_tis_per_dag=max_tis_dem ) - gdal_translate = BashOperator.partial( - task_id='gdal_translate', - bash_command='rm -f $FILEPATH_OUT && gdal_translate -of PNG $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT' - ) + gdal_translate = get_transform_task() gdal_warps = gdal_warp.expand( env=downloads.map(lambda filename: { @@ -119,7 +140,7 @@ def fn(downloads): gdal_translates = gdal_translate.expand( env=gdal_dems.output.map(lambda filename: { "FILEPATH_IN": filename, - "FILEPATH_OUT": helper.change_filename(filename, appendix='', suffix='png'), + "FILEPATH_OUT": get_transform_outpath(filename) }) ) diff --git a/pipeline/plugins/colors/permafrost.PFR.txt b/pipeline/plugins/colors/permafrost.PFR.txt new file mode 100644 index 000000000..317d9552e --- /dev/null +++ b/pipeline/plugins/colors/permafrost.PFR.txt @@ -0,0 +1,7 @@ +100 37 87 172 255 +80 37 87 172 204 +60 37 87 172 153 +40 37 87 172 102 +20 37 87 172 51 +0 37 87 172 0 +nv 0 0 0 0