Skip to content

Commit

Permalink
feat(pipeline): add sea surface salinity dag
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Helms authored and Andreas Helms committed May 28, 2024
1 parent c864aff commit 487983a
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 22 deletions.
29 changes: 29 additions & 0 deletions data/downloads/odp-ftp-sea-surface-salinity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from xcube.core.store import new_data_store
from datetime import timedelta, datetime
from dateutil.relativedelta import *

cci_store = new_data_store('esa-cci')

start_date = '2010-01-01'
end_date = '2022-10-15'
number_of_timestamps = 307

cube = cci_store.open_data('esacci.SEASURFACESALINITY.15-days.L4.SSS.multi-sensor.multi-platform.GLOBAL-MERGED_OI_Monthly_CENTRED_15Day_0-25deg.4-41.r1',
variable_names=['sss'],
time_range=[start_date, end_date])

s = start_date + "T12:00:00.000Z"
f = "%Y-%m-%dT%H:%M:%S.%fZ"
date = datetime.strptime(s, f)

for index in range(number_of_timestamps):

if index:
if index % 2:
# 15th of the month
date = date + timedelta(days=14)
else:
# First of the month
date = date + timedelta(days=-14) + relativedelta(months=+1)

cube.sss.sel(time=date, method='nearest').to_netcdf(path='./' + date.strftime("%Y%m%d") + '.nc')
21 changes: 0 additions & 21 deletions data/downloads/odp-ftp-sea-surface-salinity.sh

This file was deleted.

68 changes: 68 additions & 0 deletions pipeline/dags/sea_surface_salinity_sss.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import datetime
import task_factories
from airflow import DAG
from airflow.models.param import Param
from helper import get_default_layer_version

# layer
LAYER_ID = 'sea_surface_salinity'
LAYER_VARIABLE = 'sss'
METADATA = {
"id": f'{LAYER_ID}.{LAYER_VARIABLE}',
"timestamps": [], # will be injected
"min_value": 30,
"max_value": 40,
"type": "image", # 'tiles' or 'image'
"zoom_levels": '0-3',
"units": "PSU",
"colorMap": "custom",
"basemap": "ocean",
"time_format": {
"year": "numeric",
"month": "long",
"day": "numeric"
}
}

# dev
BUCKET_ORIGIN = 'esa-cfs-cate-data'
BUCKET_TMP = 'esa-cfs-pipeline-tmp'
WORKDIR = '/workdir/files'
COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt'
DEBUG = False

default_layer_version = get_default_layer_version()
dag_params = {
"max_files": Param(2, type=["null", "integer"], minimum=0,),
"output_bucket": Param("esa-cfs-pipeline-output", type=["string"], enum=['esa-cfs-pipeline-output', 'esa-cfs-tiles']),
"skip_downloads": Param(False, type="boolean"),
"layer_version": Param(default_layer_version, type="string")
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_dir_skippable(
task_id='clean_workdir', dir=WORKDIR)()
list_files = task_factories.gcs_list_files(
bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(
bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded')
legend_image = task_factories.legend_image(
workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(
layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'])
upload = task_factories.upload(
WORKDIR, LAYER_ID, LAYER_VARIABLE, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)

if DEBUG:
downloads >> task_factories.gdal_info()
2 changes: 1 addition & 1 deletion pipeline/dags/task_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def gcs_list_files(bucket_name: str, layer_id: str, layer_variable: str, task_id
def fn(**context):
max_files = context["params"]["max_files"]
hook = GCSHook('google')
subdir_path = f'{context["params"]["input_bucket_subdir"]}/' if context["params"]["input_bucket_subdir"] else ''
subdir_path = f'{context["params"]["input_bucket_subdir"]}/' if "input_bucket_subdir" in context["params"] else ''
filenames = hook.list(
bucket_name, match_glob=f'{layer_id}.{layer_variable}/{subdir_path}*.nc')

Expand Down
File renamed without changes.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 487983a

Please sign in to comment.