Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
pwambach authored and andreashelms committed Feb 5, 2024
1 parent 8744c70 commit 0fe6a9b
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 14 deletions.
8 changes: 3 additions & 5 deletions pipeline/dags/cloud.cfc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,22 @@
"max_files": Param(2, type="integer", minimum=0)
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:
with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE, min_value=METADATA['min_value'], max_value=METADATA['max_value'])
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(clamps) >> upload()
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)

Expand Down
10 changes: 4 additions & 6 deletions pipeline/dags/land_cover.lccs_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,23 @@
"max_files": Param(1, type="integer", minimum=0)
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:
with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

dry_run=True
dry_run=False

# create tasks
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR, dry_run=dry_run)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=dry_run)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE, min_value=METADATA['min_value'], max_value=METADATA['min_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE, color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_warp=1, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])

# connect tasks
files = list_files()
clean_workdir >> files
downloads = download.expand(filename=files)
# clamps = clamp_netcdf.expand(filename=downloads)
gdal_transforms(downloads) >> upload()
clean_workdir >> legend_image
metadata(files)
Expand Down
4 changes: 2 additions & 2 deletions pipeline/dags/permafrost.pfr.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@
"max_files": Param(1, type="integer", minimum=0)
}

with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule_interval=timedelta(days=1), catchup=False, params=dag_params) as dag:
with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag:

# create tasks
clean_workdir = task_factories.clean_dir(task_id='clean_workdir', dir=WORKDIR)
list_files = task_factories.gcs_list_files(bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE)
download = task_factories.gcs_download_file(bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded', dry_run=False)
legend_image = task_factories.legend_image(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, color_file=COLOR_FILE)
metadata = task_factories.metadata(bucket_name=BUCKET_OUTPUT, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE, layer_version=LAYER_VERSION, workdir=WORKDIR, metadata=METADATA)
metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA)
clamp_netcdf = task_factories.clamp_netcdf(layer_variable=LAYER_VARIABLE.upper(), min_value=METADATA['min_value'], max_value=METADATA['min_value'])
gdal_transforms = task_factories.gdal_transforms(layer_variable=LAYER_VARIABLE.upper(), color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION, max_tis_dem=1, max_tis_translate=1)
upload = task_factories.upload(BUCKET_OUTPUT, WORKDIR, LAYER_ID, LAYER_VARIABLE, LAYER_VERSION, METADATA['type'])
Expand Down
2 changes: 1 addition & 1 deletion pipeline/dags/task_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def legend_image(bucket_name: str, layer_id: str, layer_variable: str, layer_ver
# generate timestamps array from filenames and write metadata to json file then upload to gcs


def metadata(bucket_name: str, layer_id: str, layer_variable: str, layer_version: str, workdir: str, metadata: dict):
def metadata(workdir: str, metadata: dict):
@task(task_id='metadata')
def fn(files):
timestamps = list(map(helper.filename_to_date, files))
Expand Down

0 comments on commit 0fe6a9b

Please sign in to comment.