diff --git a/pipeline/dags/cloud.cfc.py b/pipeline/dags/cloud.cfc.py index f5dc50f9a..50648444c 100644 --- a/pipeline/dags/cloud.cfc.py +++ b/pipeline/dags/cloud.cfc.py @@ -43,7 +43,7 @@ clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR) list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False) - legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE) + legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE) metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA) gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION) upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type']) diff --git a/pipeline/dags/land_cover.lccs_class.py b/pipeline/dags/land_cover.lccs_class.py index dda7f77b6..f37811696 100644 --- a/pipeline/dags/land_cover.lccs_class.py +++ b/pipeline/dags/land_cover.lccs_class.py @@ -45,7 +45,7 @@ clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run) list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=dry_run) - legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE) + legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE) metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA) gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_warp=1, max_tis_dem=1, max_tis_translate=1) upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type']) diff --git a/pipeline/dags/permafrost.pfr.py b/pipeline/dags/permafrost.pfr.py index 672b234d2..206676c1d 100644 --- a/pipeline/dags/permafrost.pfr.py +++ b/pipeline/dags/permafrost.pfr.py @@ -43,9 +43,8 @@ clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR) list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False) - legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE) + legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE) metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA) - clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE.upper(), min_value=METADATA['min_value'], max_value=METADATA['min_value']) gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1) upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type']) @@ -53,8 +52,7 @@ files = list_files() clean_workdir >> files downloads = download.expand(filename=files) - clamps = clamp_netcdf.expand(filename=downloads) - gdal_transforms(clamps) >> upload() + gdal_transforms(downloads) >> upload() clean_workdir >> legend_image metadata(files) diff --git a/pipeline/dags/sea_ice.ice_conc.py b/pipeline/dags/sea_ice.ice_conc.py new file mode 100644 index 000000000..786f1c2a5 --- /dev/null +++ b/pipeline/dags/sea_ice.ice_conc.py @@ -0,0 +1,144 @@ +from datetime import datetime +import task_factories +import helper +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.bash import BashOperator + +# layer +LAYER_ID = 'sea_ice' +LAYER_VARIABLE = 'ice_conc' +LAYER_VERSION = '1.2.3' +RESOLUTION = '2444 496' +METADATA = { + "id": f'{LAYER_ID}.{LAYER_VARIABLE}', + "version": LAYER_VERSION, + "timestamps": [], # will be injected + "min_value": 0, + "max_value": 100, + "type": "tiles", # 'tiles' or 'image' + "zoom_levels": '0-3', + "units": 'm', + "basemap": 'blue', + "legend_values": ["100 %", "0"], + "time_format": { + "year": "numeric", + "month": "long" + } +} + +# dev +BUCKET_ORIGIN = 'esa-cfs-cate-data' +BUCKET_TMP = 'esa-cfs-pipeline-tmp' +BUCKET_OUTPUT = 'esa-cfs-pipeline-output' +WORKDIR = '/workdir/files' +COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt' +DEBUG=False + +dag_params = { + "max_files": Param(1, type="integer", minimum=0) +} + + +with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag: + + dry_run=False + + # create tasks + clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run) + + list_files_nh = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID + '_nh', layer_variable=LAYER_VARIABLE) + download_nh = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_nh_downloaded', task_id='download_file_nh', dry_run=dry_run) + + list_files_sh = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID + '_sh', layer_variable=LAYER_VARIABLE) + download_sh = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_sh_downloaded', task_id='download_file_sh', dry_run=dry_run) + + legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE) + metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA) + + gdal_te_nh = "-180 16.62393 180 90" + gdal_te_sh = "-180 -90 180 -16.62393" + warp_command_nh = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te_nh} -ts {RESOLUTION} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' + warp_command_sh = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te_sh} -ts {RESOLUTION} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' + + gdal_warp_nh = BashOperator.partial( + task_id='reproject_and_to_tiff_nh', + bash_command=f'rm -f $FILEPATH_OUT && {warp_command_nh} && echo $FILEPATH_OUT' + ) + gdal_warp_sh = BashOperator.partial( + task_id='reproject_and_to_tiff_sh', + bash_command=f'rm -f $FILEPATH_OUT && {warp_command_sh} && echo $FILEPATH_OUT' + ) + + gdal_merge = BashOperator.partial( + task_id='merge_tiffs', + bash_command=f'rm -f $FILEPATH_OUT && gdal_merge.py -o $FILEPATH_OUT $FILEPATH_IN_NH $FILEPATH_IN_SH && echo $FILEPATH_OUT' + ) + + upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type']) + + + gdal_dem = BashOperator.partial( + task_id='gdal_dem', + bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT' + ) + + gdal_translate = BashOperator.partial( + task_id='gdal_translate_tiles', + bash_command=f'rm -rf $FILEPATH_OUT && gdal2tiles.py --profile geodetic --zoom={METADATA["zoom_levels"]} --tmscompatible --no-kml --webviewer=none --resampling average --s_srs EPSG:4326 $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT' + ) + + + # connect tasks + files_nh = list_files_nh() + files_sh = list_files_sh() + + clean_workdir >> files_nh + clean_workdir >> files_sh + + downloads_nh = download_nh.expand(filename=files_nh) + downloads_sh = download_sh.expand(filename=files_sh) + + gdal_warps_nh = gdal_warp_nh.expand( + env=downloads_nh.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": helper.change_filename(filename, appendix='_nh_warped', suffix='tiff'), + "DATA_VARIABLE": LAYER_VARIABLE + }) + ) + + gdal_warps_sh = gdal_warp_sh.expand( + env=downloads_sh.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": helper.change_filename(filename, appendix='_sh_warped', suffix='tiff'), + "DATA_VARIABLE": LAYER_VARIABLE + }) + ) + + gdal_merges = gdal_merge.expand( + env=gdal_warps_sh.output.map(lambda filename: { + "FILEPATH_OUT": helper.change_filename(filename, appendix='_merged', suffix='tiff'), + "FILEPATH_IN_NH": helper.change_filename(filename, appendix='_nh_warped', suffix='tiff'), + "FILEPATH_IN_SH": helper.change_filename(filename, appendix='_sh_warped', suffix='tiff') + }) + ) + + gdal_dems = gdal_dem.expand( + env=gdal_merges.output.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": helper.change_filename(filename, appendix='colored'), + "COLOR_FILE": COLOR_FILE + }) + ) + + gdal_translates = gdal_translate.expand( + env=gdal_dems.output.map(lambda filename: { + "FILEPATH_IN": filename, + "FILEPATH_OUT": helper.change_filename(filename, appendix='', remove_suffix=True) + }) + ) + + gdal_translates >> upload() + + clean_workdir >> legend_image + metadata(files_nh) # only needs one set of images \ No newline at end of file diff --git a/pipeline/dags/task_factories.py b/pipeline/dags/task_factories.py index ced9d1514..6159fcc52 100644 --- a/pipeline/dags/task_factories.py +++ b/pipeline/dags/task_factories.py @@ -33,8 +33,8 @@ def fn(**context): return fn -def gcs_download_file(bucket_name: str, dir: str, appendix: str = '', dry_run=False): - @task(task_id='gcs_download_file' if not dry_run else 'gcs_download_file_dry_run') +def gcs_download_file(bucket_name: str, dir: str, appendix: str = '', dry_run=False, task_id="gcs_download_file"): + @task(task_id=task_id if not dry_run else 'gcs_download_file_dry_run') def fn(filename: str): hook = GCSHook('google') local_filename = dir + '/' + \ @@ -77,7 +77,7 @@ def gdal_info(): ) -def legend_image(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, color_file: str): +def legend_image(workdir: str, color_file: str): return BashOperator( task_id='legend_image', bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT', @@ -103,6 +103,7 @@ def fn(files): "units": extended_metadata['units'], "legendValues": extended_metadata['legend_values'], "timeFormat": extended_metadata['time_format'], + "basemap": extended_metadata['basemap'] } filepath = str(Path(workdir).joinpath('metadata.json')) @@ -232,17 +233,17 @@ def fn(): return fn -def clamp_netcdf(layer_variable: str, min_value: float, max_value: float): - @task(task_id='clamp_netcdf') - def fn(filename): - import xarray as xr - print(filename) - ds = xr.open_dataset(filename) - ds[layer_variable] = ds[layer_variable].clip(min_value, max_value) - new_filename = helper.change_filename(filename, appendix='clamped') - encoding = {var: {"zlib": True, "complevel": 9} for var in ds.data_vars} - ds.to_netcdf(new_filename, engine='netcdf4', encoding=encoding) - ds.close() - return new_filename - - return fn +# def clamp_netcdf(layer_variable: str, min_value: float, max_value: float): +# @task(task_id='clamp_netcdf') +# def fn(filename): +# import xarray as xr +# print(filename) +# ds = xr.open_dataset(filename) +# ds[layer_variable] = ds[layer_variable].clip(min_value, max_value) +# new_filename = helper.change_filename(filename, appendix='clamped') +# encoding = {var: {"zlib": True, "complevel": 9} for var in ds.data_vars} +# ds.to_netcdf(new_filename, engine='netcdf4', encoding=encoding) +# ds.close() +# return new_filename + +# return fn diff --git a/pipeline/plugins/colors/sea_ice.ice_conc.txt b/pipeline/plugins/colors/sea_ice.ice_conc.txt new file mode 100644 index 000000000..f26429e36 --- /dev/null +++ b/pipeline/plugins/colors/sea_ice.ice_conc.txt @@ -0,0 +1,3 @@ +10000 255 255 255 255 +1500 255 255 255 0 +nv 0 0 0 0