Skip to content

Commit

Permalink
feat: sea_ice
Browse files Browse the repository at this point in the history
  • Loading branch information
pwambach authored and andreashelms committed Feb 5, 2024
1 parent 0fe6a9b commit f26e68d
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 23 deletions.
2 changes: 1 addition & 1 deletion pipeline/dags/cloud.cfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])
Expand Down
2 changes: 1 addition & 1 deletion pipeline/dags/land_cover.lccs_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=dry_run)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_warp=1, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])
Expand Down
6 changes: 2 additions & 4 deletions pipeline/dags/permafrost.pfr.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,16 @@
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE.upper(), min_value=METADATA['min_value'], max_value=METADATA['min_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(clamps) >> upload()
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)

Expand Down
144 changes: 144 additions & 0 deletions pipeline/dags/sea_ice.ice_conc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from datetime import datetime
import task_factories
import helper
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash import BashOperator

# layer
LAYER_ID = 'sea_ice'
LAYER_VARIABLE = 'ice_conc'
LAYER_VERSION = '1.2.3'
RESOLUTION = '2444 496'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"version": LAYER_VERSION,
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 100,
"type": "tiles", # 'tiles' or 'image'
"zoom_levels": '0-3',
"units": 'm',
"basemap": 'blue',
"legend_values": ["100 %", "0"],
"time_format": {
"year": "numeric",
"month": "long"
}
}

# dev
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

dag_params = {
"max_files": Param(1, type="integer", minimum=0)
}


with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

dry_run=False

# create tasks
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run)

list_files_nh = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID + '_nh', layer_variable=LAYER_VARIABLE)
download_nh = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_nh_downloaded', task_id='download_file_nh', dry_run=dry_run)

list_files_sh = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID + '_sh', layer_variable=LAYER_VARIABLE)
download_sh = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_sh_downloaded', task_id='download_file_sh', dry_run=dry_run)

legend_image = task_factories.legend_image(workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)

gdal_te_nh = "-180 16.62393 180 90"
gdal_te_sh = "-180 -90 180 -16.62393"
warp_command_nh = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te_nh} -ts {RESOLUTION} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT'
warp_command_sh = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te_sh} -ts {RESOLUTION} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT'

gdal_warp_nh = BashOperator.partial(
task_id='reproject_and_to_tiff_nh',
bash_command=f'rm -f $FILEPATH_OUT && {warp_command_nh} && echo $FILEPATH_OUT'
)
gdal_warp_sh = BashOperator.partial(
task_id='reproject_and_to_tiff_sh',
bash_command=f'rm -f $FILEPATH_OUT && {warp_command_sh} && echo $FILEPATH_OUT'
)

gdal_merge = BashOperator.partial(
task_id='merge_tiffs',
bash_command=f'rm -f $FILEPATH_OUT && gdal_merge.py -o $FILEPATH_OUT $FILEPATH_IN_NH $FILEPATH_IN_SH && echo $FILEPATH_OUT'
)

upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])


gdal_dem = BashOperator.partial(
task_id='gdal_dem',
bash_command='rm -f $FILEPATH_OUT && gdaldem color-relief $FILEPATH_IN $COLOR_FILE --config GDAL_CACHEMAX 90% -co compress=LZW -alpha $FILEPATH_OUT && echo $FILEPATH_OUT'
)

gdal_translate = BashOperator.partial(
task_id='gdal_translate_tiles',
bash_command=f'rm -rf $FILEPATH_OUT && gdal2tiles.py --profile geodetic --zoom={METADATA["zoom_levels"]} --tmscompatible --no-kml --webviewer=none --resampling average --s_srs EPSG:4326 $FILEPATH_IN $FILEPATH_OUT && echo $FILEPATH_OUT'
)


# connect tasks
files_nh = list_files_nh()
files_sh = list_files_sh()

clean_workdir >> files_nh
clean_workdir >> files_sh

downloads_nh = download_nh.expand(filename=files_nh)
downloads_sh = download_sh.expand(filename=files_sh)

gdal_warps_nh = gdal_warp_nh.expand(
env=downloads_nh.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": helper.change_filename(filename, appendix='_nh_warped', suffix='tiff'),
"DATA_VARIABLE": LAYER_VARIABLE
})
)

gdal_warps_sh = gdal_warp_sh.expand(
env=downloads_sh.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": helper.change_filename(filename, appendix='_sh_warped', suffix='tiff'),
"DATA_VARIABLE": LAYER_VARIABLE
})
)

gdal_merges = gdal_merge.expand(
env=gdal_warps_sh.output.map(lambda filename: {
"FILEPATH_OUT": helper.change_filename(filename, appendix='_merged', suffix='tiff'),
"FILEPATH_IN_NH": helper.change_filename(filename, appendix='_nh_warped', suffix='tiff'),
"FILEPATH_IN_SH": helper.change_filename(filename, appendix='_sh_warped', suffix='tiff')
})
)

gdal_dems = gdal_dem.expand(
env=gdal_merges.output.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": helper.change_filename(filename, appendix='colored'),
"COLOR_FILE": COLOR_FILE
})
)

gdal_translates = gdal_translate.expand(
env=gdal_dems.output.map(lambda filename: {
"FILEPATH_IN": filename,
"FILEPATH_OUT": helper.change_filename(filename, appendix='', remove_suffix=True)
})
)

gdal_translates >> upload()

clean_workdir >> legend_image
metadata(files_nh) # only needs one set of images
35 changes: 18 additions & 17 deletions pipeline/dags/task_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def fn(**context):
return fn


def gcs_download_file(bucket_name: str, dir: str, appendix: str = '', dry_run=False):
@task(task_id='gcs_download_file' if not dry_run else 'gcs_download_file_dry_run')
def gcs_download_file(bucket_name: str, dir: str, appendix: str = '', dry_run=False, task_id="gcs_download_file"):
@task(task_id=task_id if not dry_run else 'gcs_download_file_dry_run')
def fn(filename: str):
hook = GCSHook('google')
local_filename = dir + '/' + \
Expand Down Expand Up @@ -77,7 +77,7 @@ def gdal_info():
)


def legend_image(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, color_file: str):
def legend_image(workdir: str, color_file: str):
return BashOperator(
task_id='legend_image',
bash_command=f'rm -f $FILEPATH_OUT && node /opt/airflow/plugins/generate-legend-image.js && echo $FILEPATH_OUT',
Expand All @@ -103,6 +103,7 @@ def fn(files):
"units": extended_metadata['units'],
"legendValues": extended_metadata['legend_values'],
"timeFormat": extended_metadata['time_format'],
"basemap": extended_metadata['basemap']
}
filepath = str(Path(workdir).joinpath('metadata.json'))

Expand Down Expand Up @@ -232,17 +233,17 @@ def fn():
return fn


def clamp_netcdf(layer_variable: str, min_value: float, max_value: float):
@task(task_id='clamp_netcdf')
def fn(filename):
import xarray as xr
print(filename)
ds = xr.open_dataset(filename)
ds[layer_variable] = ds[layer_variable].clip(min_value, max_value)
new_filename = helper.change_filename(filename, appendix='clamped')
encoding = {var: {"zlib": True, "complevel": 9} for var in ds.data_vars}
ds.to_netcdf(new_filename, engine='netcdf4', encoding=encoding)
ds.close()
return new_filename

return fn
# def clamp_netcdf(layer_variable: str, min_value: float, max_value: float):
# @task(task_id='clamp_netcdf')
# def fn(filename):
# import xarray as xr
# print(filename)
# ds = xr.open_dataset(filename)
# ds[layer_variable] = ds[layer_variable].clip(min_value, max_value)
# new_filename = helper.change_filename(filename, appendix='clamped')
# encoding = {var: {"zlib": True, "complevel": 9} for var in ds.data_vars}
# ds.to_netcdf(new_filename, engine='netcdf4', encoding=encoding)
# ds.close()
# return new_filename

# return fn
3 changes: 3 additions & 0 deletions pipeline/plugins/colors/sea_ice.ice_conc.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
10000 255 255 255 255
1500 255 255 255 0
nv 0 0 0 0

0 comments on commit f26e68d

Please sign in to comment.