Skip to content

Commit

Permalink
feat: land cover
Browse files Browse the repository at this point in the history
  • Loading branch information
pwambach authored and andreashelms committed Feb 5, 2024
1 parent 46d2d8f commit 8744c70
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
assets/legend-images/

pipeline/logs
pipeline/workdir
pipeline/*/__pycache__
service-account.json
4 changes: 4 additions & 0 deletions pipeline/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,8 @@ RUN sudo npm install -g canvas

USER airflow

# xarray
RUN pip install xarray netcdf4 h5netcdf



15 changes: 9 additions & 6 deletions pipeline/dags/cloud.cfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
RESOLUTION = '1024 512'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"version": LAYER_VERSION,
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 10,
"max_value": 1,
"type": "image", # 'tiles' or 'image'
"zoom_levels": '0-2',
"units": 'mg/m2',
Expand All @@ -28,7 +29,7 @@
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/tmp/files'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

Expand All @@ -39,19 +40,21 @@
with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_workdir(workdir=WORKDIR)
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE, min_value=METADATA['min_value'], max_value=METADATA['max_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION)
upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
gdal_transforms(downloads) >> upload
clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(clamps) >> upload()
clean_workdir >> legend_image
metadata(files)

Expand Down
6 changes: 5 additions & 1 deletion pipeline/dags/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ def change_filename(filepath: str, appendix: str = '', suffix: str = '', remove_
name = p.name.replace(p.suffix, '').split('_')[0]
new_suffix = suffix or p.suffix if not remove_suffix else ''

return p.with_name(name + appendix + new_suffix)
return str(p.with_name(name + appendix + new_suffix))

def filename_to_date(filename: str):
date_string = filename.split('/')[-1].replace('.nc', '')
return datetime.strptime(date_string, "%Y%m%d").isoformat() + 'Z'

def date_to_filename(date_string: str):
return datetime.fromisoformat(date_string.replace('Z', '')).strftime('%Y%m%d')



64 changes: 64 additions & 0 deletions pipeline/dags/land_cover.lccs_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from datetime import datetime, timedelta
import task_factories
from airflow import DAG
from airflow.models.param import Param

# layer
LAYER_ID = 'land_cover'
LAYER_VARIABLE = 'lccs_class'
LAYER_VERSION = '1.2.3'
RESOLUTION = '64800 32400'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"version": LAYER_VERSION,
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 220,
"type": "tiles", # 'tiles' or 'image'
"zoom_levels": '0-5',
"units": 'm',
"basemap": 'blue',
"legend_values": ["100 %", "0"],
"time_format": {
"year": "numeric",
"month": "long"
}
}

# dev
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

dag_params = {
"max_files": Param(1, type="integer", minimum=0)
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:

dry_run=True

# create tasks
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=dry_run)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE, min_value=METADATA['min_value'], max_value=METADATA['min_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
# clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()
24 changes: 12 additions & 12 deletions pipeline/dags/permafrost.pfr.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
LAYER_ID = 'permafrost'
LAYER_VARIABLE = 'pfr'
LAYER_VERSION = '1.2.3'
RESOLUTION = '36366 18182'
RESOLUTION = '2048 1024'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"version": LAYER_VERSION,
"timestamps": [], # will be injected
"min_value": 0,
"max_value": 10,
"max_value": 100,
"type": "tiles", # 'tiles' or 'image'
"zoom_levels": '0-2',
"units": 'xxx',
"units": 'm',
"basemap": 'blue',
"legend_values": ["100 %", "0"],
"time_format": {
Expand All @@ -28,7 +29,7 @@
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
BUCKET_OUTPUT = 'esa-cfs-pipeline-output'
WORKDIR = '/tmp/files'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG=False

Expand All @@ -39,24 +40,23 @@
with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_workdir(workdir=WORKDIR)
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE.upper(), min_value=METADATA['min_value'], max_value=METADATA['min_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.gcloud_upload_dir(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, directory=WORKDIR)

upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])
# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
gdal_transforms(downloads) >> upload
clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(clamps) >> upload()
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()



Loading

0 comments on commit 8744c70

Please sign in to comment.