From cbcaa84b5213359935069ffdf55f51a93717af20 Mon Sep 17 00:00:00 2001 From: Andreas Helms Date: Wed, 28 Feb 2024 16:36:31 +0100 Subject: [PATCH] feat(pipeline): vegetation lai --- pipeline/dags/task_factories.py | 9 ++- pipeline/dags/vegetation.lai.py | 69 ++++++++++++++++++ pipeline/plugins/colors/vegetation.lai.txt | 6 ++ .../plugins/layer-icons/vegetation.lai.png | Bin 0 -> 4514 bytes 4 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 pipeline/dags/vegetation.lai.py create mode 100644 pipeline/plugins/colors/vegetation.lai.txt create mode 100644 pipeline/plugins/layer-icons/vegetation.lai.png diff --git a/pipeline/dags/task_factories.py b/pipeline/dags/task_factories.py index 3bf18cf6a..e4e88e01c 100644 --- a/pipeline/dags/task_factories.py +++ b/pipeline/dags/task_factories.py @@ -89,10 +89,10 @@ def fn(filename: str): def gcloud_upload_dir(layer_id: str, layer_variable: str, directory: str): return BashOperator( task_id='gcloud_upload', - bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -q -m cp -r $UPLOAD_DIR/* $BUCKET', + bash_command='gcloud auth activate-service-account --key-file $KEY_FILE && gsutil -m -h "Cache-Control:no-cache" rsync -d -r $UPLOAD_DIR $BUCKET', env={ "UPLOAD_DIR": directory, - "BUCKET": 'gs://{{ dag_run.conf["output_bucket"] }}/{{ dag_run.conf["layer_version"] }}/' + f'{layer_id}.{layer_variable}/', + "BUCKET": 'gs://{{ dag_run.conf["output_bucket"] }}/{{ dag_run.conf["layer_version"] }}/' + f'{layer_id}.{layer_variable}', "KEY_FILE": '/opt/airflow/plugins/service-account.json', "CLOUDSDK_PYTHON": '/usr/local/bin/python' } @@ -155,7 +155,7 @@ def fn(files, **context): return fn -def gdal_transforms(layer_variable: str, color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4): +def gdal_transforms(color_file: str, layer_type: str, zoom_levels: str, gdal_te: str = '-180 -90 180 90', gdal_ts: str = '1024 512', layer_variable: str = '', warp_cmd: str = None, max_tis_warp: int = 4, max_tis_dem: int = 4, max_tis_translate: int = 4): def get_transform_task(): if layer_type == 'image': return BashOperator.partial( @@ -178,7 +178,8 @@ def get_transform_outpath(filename): @task_group(group_id='gdal_transforms_group') def fn(downloads): - warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW NETCDF:"$FILEPATH_IN":$DATA_VARIABLE $FILEPATH_OUT' if not warp_cmd else warp_cmd + file_path_in = 'NETCDF:"$FILEPATH_IN":$DATA_VARIABLE' if layer_variable else '$FILEPATH_IN' + warp_command = f'gdalwarp -t_srs EPSG:4326 -te {gdal_te} -ts {gdal_ts} -r near --config GDAL_CACHEMAX 90% -co compress=LZW {file_path_in} $FILEPATH_OUT' if not warp_cmd else warp_cmd gdal_warp = BashOperator.partial( task_id='reproject_and_to_tiff', bash_command=f'rm -f $FILEPATH_OUT && {warp_command} && echo $FILEPATH_OUT', diff --git a/pipeline/dags/vegetation.lai.py b/pipeline/dags/vegetation.lai.py new file mode 100644 index 000000000..cc9f976f3 --- /dev/null +++ b/pipeline/dags/vegetation.lai.py @@ -0,0 +1,69 @@ +from datetime import datetime +import task_factories +from airflow import DAG +from airflow.models.param import Param +from helper import get_default_layer_version + +# layer +LAYER_ID = 'vegetation' +LAYER_VARIABLE = 'lai' +RESOLUTION = '2240 12320' +METADATA = { + "id": f'{LAYER_ID}.{LAYER_VARIABLE}', + "timestamps": [], # will be injected + "min_value": 0, + "max_value": 8, + "type": "tiles", # 'tiles' or 'image' + "zoom_levels": '0-7', + "units": '', + "basemap": None, + "legend_values": ["8 m²/m²", "0"], + "time_format": { + "year": "numeric", + "month": "long", + "day": "numeric" + } +} + +# dev +BUCKET_ORIGIN = 'esa-cfs-cate-data' +BUCKET_TMP = 'esa-cfs-pipeline-tmp' +WORKDIR = '/workdir/files' +COLOR_FILE = f'/opt/airflow/plugins/colors/{LAYER_ID}.{LAYER_VARIABLE}.txt' +DEBUG = False + +default_layer_version = get_default_layer_version() +dag_params = { + "max_files": Param(2, type=["null", "integer"], minimum=0,), + "output_bucket": Param("esa-cfs-pipeline-output", type=["string"], enum=['esa-cfs-pipeline-output', 'esa-cfs-tiles']), + "skip_downloads": Param(False, type="boolean"), + "layer_version": Param(default_layer_version, type="string") +} + +with DAG(dag_id=METADATA["id"], start_date=datetime(2022, 1, 1), schedule=None, catchup=False, params=dag_params) as dag: + + # create tasks + clean_workdir = task_factories.clean_dir_skippable( + task_id='clean_workdir', dir=WORKDIR)() + list_files = task_factories.gcs_list_files( + bucket_name=BUCKET_ORIGIN, layer_id=LAYER_ID, layer_variable=LAYER_VARIABLE) + download = task_factories.gcs_download_file( + bucket_name=BUCKET_ORIGIN, dir=WORKDIR, appendix='_downloaded') + legend_image = task_factories.legend_image( + workdir=WORKDIR, color_file=COLOR_FILE) + metadata = task_factories.metadata(workdir=WORKDIR, metadata=METADATA) + gdal_transforms = task_factories.gdal_transforms( + color_file=COLOR_FILE, layer_type=METADATA['type'], zoom_levels=METADATA['zoom_levels'], gdal_ts=RESOLUTION) + upload = task_factories.upload( + WORKDIR, LAYER_ID, LAYER_VARIABLE, METADATA['type']) + + # connect tasks + files = list_files() + clean_workdir >> files + downloads = download.expand(filename=files) + gdal_transforms(downloads) >> upload() + clean_workdir >> legend_image + metadata(files) + + if DEBUG: + downloads >> task_factories.gdal_info() diff --git a/pipeline/plugins/colors/vegetation.lai.txt b/pipeline/plugins/colors/vegetation.lai.txt new file mode 100644 index 000000000..9009fb106 --- /dev/null +++ b/pipeline/plugins/colors/vegetation.lai.txt @@ -0,0 +1,6 @@ +7.8 0 104 55 255 +4.28 106 191 114 255 +2.49 173 221 144 255 +0.98 224 242 178 255 +0 255 255 204 255 +nv 0 0 0 0 diff --git a/pipeline/plugins/layer-icons/vegetation.lai.png b/pipeline/plugins/layer-icons/vegetation.lai.png new file mode 100644 index 0000000000000000000000000000000000000000..f163694ea54f1461c9e98577163976d575ffde51 GIT binary patch literal 4514 zcmV;T5nb+yP)4sb>~pU=b?fP_?&ffA9Fm$L6NMC^(cAhmPbVHXjlM0fHn(o`N7R zc?^&Q$)BJAaRLOEA}XW;$U-2?jEK^>)cNqArQhrX^kr-+GqrzwMM!ws#X+5f#Z5nAh z`E*g_&sAmp2}e3-!XP+ijOiF-a2)4AN;z;G=Z2JWO}g&8ON)yaT-P1fO5^(;u9R47 zksw`nKNfd zk^~_HSFT)PeSMw4_X)y~x~>6`QUZ|Y`F_9Of5XDpilX?s*4o}r!3T-+NEbT}p65yF z$bS_C{^g~m#aD0N-uS&VogDkjXFkKT&pyk^lP5WG;sk@ifU2s{#?b5a@H~$^&l!)$ zSZg^vJnU|6ZvGudh(B&N8dt+G{MR5tZS*4LWqCcZnsYZtgo*V zMG>#O@(NKD0dV!|RlfiI@3XtROQ+L0@|XeZU=diWP)ZSoAwmdFo;*pb)j~>%5MraM zs(+KF*?%?0P}b!y9zw<#!XQX?hr9p98uR5a3^_bJP)IVT@K#ZvzSJ3Bk=jE2Wq8gG~zEd8;u`GDIZ^6US@A^k2Fo`cDqzn#n-?7b>4jQ zO~&IfQ54Z^Hqlz6wMGbmwHB=v3kyA-I(Hm5F4-Pj;>`Im&5okTp)AX(BGddrN=cSw zNGX?BSC+pY`2OFn$_k6UzX!B7ANW#hjqiJZv)yig8=%o>vbQ&6dwZKO43SduqaXbU zAp|QcD;Q%CLa@KT&(6*cQ54P5%390Pu@yR92RGU!OD*5~_7*43Mg(4mB6Dzk1-SD# zcO2(P2cKh$$6n9N;^j2WzUBK8e$Eib7{hE?YpJRl#}S|Gb$j0d*xcM?G&*2^e;>!0 zCbKbyIF5;;=$<&YxUNeSMdW$T^71mBP6wqFn_IWJa%qUIBzuE`wZA%G|R;3hDN|yWm*PG48r>d&@nS-&`;>K|dfH=CL(P;R+Zu@&-7_qgr#dtiX zs;c?vKS-2PEG{n6?RGgl95dYA04_Ff&gO-bzS58K1+*>e+KleC3RiNQ7OU^N6H&(ainCqzw}yBl->39bv8FQF~*>j z!gU{9^R(6kLBR6zGHIIe*kj8i?T8?9@S+-x=GR_a(K$K`DkaDWL z5P~er$g+&8s%FKn(d~5qR}@9~en1#bm+Kr=Mxzu9z25Ka?hd|q@!}fRT0GC2hll5R z^9uZ7;y6=GxP19C&1Qll1z~JhT#czphlK^7Uf<(xdPr?+JmcXv3mOY4`CgC7WFOxT zXti2gxNw1XyUkNiJ;h`)L2FH`mHZ}xzrVAy^R>Va0E=6c1=bkixFII#_>V4LT$>}p zz49RC0fZ1JrKWILX#(G8`CLe=D{w2H!JUHrVUA1$j*yh9rqGtKIp*5MF^7jEy1fpa zPKTA171r0+iQ|~n)m31MSia|dRXXyUwW@2!agbm7?JrSQC3RK);r715&R~m! zg9A>SI5GETx7(%NZc~;eS(f=yI%QduZ%S7pU;5%pU@b*ayjE2z@qPcHPWyhMwI+^Z zR#sMUU593pu;f3%?H^ZMdhZb9RV<#CTz)4d9cfk`YZ68-MO||DTER;HI8QuriZBc* zih^FRM^O}v$79;<_PoaUzW*zu!^3~Cl)|m+8m#?-5Mp(j-47Qk_q(pDDtf&hXV0Dm zrftZTOLw?(5z;7h80X`5f$5Xj@>EpgyojS5em$ zbzS3m9!Zkm`#$f#|2{zw;CUW{!GIGdPRMSz`#CoZ!(Vk>cfRSzm1!GNaqiqX&YU?z zRaGQOg4UWW%kX`lvJ{*=c?`!f3tEX%lg^9C!ckNt|PwSMN85&vnL(&=<~`st@RJUql0!)P=DrlA!@ z0ZK`pdb-aGFDx@UobcywZSeSsHfPT+arW#npdiniy#4lst5>gZ`t%t9@;qm0X^Ail zNs@%MwKc9^yGAQ%KkL+WeWtGKc@=$BaU6%){r!HQ;cz&gxTKW$zJGL@7Nr#X`xQ{o zZnb#nrRR9&{32O)2-JXs*YS27}yA^9|2PjnoImTGxI6`Yf zQ7Amm5QdgdeBv2)c6Qj@+@#rT0@FI9l$wWoRh4Zg48tb(Fb$6)N-4C~JoC&m^!xq! zTyQUD-+M5|7^29ft}BMa2_R6)GMQwIMj4~S9N`$Gtcao}=gysDZEcOwXha;x#Bt1E zFkokAha^cTiXw8_?Y7_PcIULm!-}<*x~@5Q?i@=?OB@^={LF7N#U01NahxMK%&}I` zOk9Fs`n%`3T)le8-OZf-QWJqg7z)a==J@gBxUS3FZ@)c%=Jo5>=W_xeoqoT6kR-_? z8(@qf2m+RtmMF_|zQHVX?lneAIroZ`KBrGD&}vOFT?mWkNv>SZIULu3M-W(ynR@qy z7hWLCGIn-$xOVLtckkYvkFKtiV^?Xt*Jv~zTJc#M;W!ReRWTlqX*3#GYv&;}TSu%M zbzLK+B#u4mx&Y*nks5I6bbVG=HG@G;T~)N&9b9QC%ZfOTdH(t5xpnInX_|8C)G3C; z;k0oyX5{91exu!PlOzdQmJx>GN9MyUG-k@p44NhDQEM`PhSnNm3{pyr5m;;3*cj7l zg+!6hWRh}hA!K!>$(5_S)CxQgqzh9m`FK30-|w@!y2@}koQCUEolA~V>OxtTtgNii>-FX|$D@SO4D+p> zV*S0w%#4_R8mtv0tq>edoH!gi)?~`=SY|AbD?vC0ZR|XE(lli}9y6Ita9tM*XrnJU z2=Rl-WKtGIF?Z3!)CAx6IXF0AV`F3PxfxEou8Ze+^Dvt!;kqtbgYUU4E_7Jvw(vcd zX)b^=6)ELf*?35Iu%-12vKqC=9nmUX!QaNmO@pG$_cLHAf>=} ze1zyD0M~Vi?7hm;*fYsGio_+RN%Ce*=ijVZfy+i+v z8#gG50^j%N9teV9K4QAAJ3SC%sA|KNJKJnmyX@Bc+?@=#lMWbFDf{JwYlmC>zu`4< zRpL2v-jQx?ZLz<p{YK%et{_p<*MNwR8G~$2Zxo+HQwTPmK-QC^!<_`*` z@p#PE))wt{8!6?yiOchxjg1X9Ha4j1nuUc0^1Q~z1L9REN<&^(Q%JRt))kp4v49yS z!kFvb@N7h>0W+SGoDvF}`cX1s3r##O&b?Ow2MuT_Wd51jD z=TzZ`c_NNuilX4%ci$z7BC;%-8=mJmN-4(UF~i{=N@=<+<=1@g5a2&KYW;`DM6HiVH(P%Vcb#;~F$B#c0w9fE; z*8aM#J0Fl{aO=B1%_yPyv?h35k$5qVE10N)QJvunmtS9flI6IG2!4A1jOcA833;CX zb8WOn2rS@QYr$GvDLEV+ezOtB-)%G+U$E9P7z`K=haa0K`g!pJ$=>c5bHVV$b4?7I zmK)RP22_Qn9mEK)qV6{s-WXv-NEn8=u1mk)XLED&PqV!En(zOVC3LjWXrnRKVzF!w z1}~RoIc_u>jK^d4_Vyl%7kwCUBp?iT-qRf15XeMR3q|aNv>P!66}Jv{5N?COi|~Ap z?d@&){XW)`Ztv{8JmXKTnJ#CNrcBp_qAVE>_p)0X8!uK>Nw3$V-EKc}r}|-HEl7U~ z&BN^+BMeqrwAECqMq5p{(W0J68jS{b@7`rJ8X=|R@|DXkHk-|~5yynV)H7ii;s)N& zhKJwusH^(zjg8wc*LD5+J-zoY#|Q%!%i?jLRuG|-gLEXm^r&=+tzs(C+9HHE{JAk@YEH1?)qDN>0A~&Gnh4?rGPKTXMc<;yWQdQLe#BXSAUJZPIJ}7-)ebi<` z2#nTm6-9YoY5lFApK(8qG+RFJ{BVoGU_>))AgqHFE`gtLdo$yE-+h;ZgRO5XrOsPx z{*RO&&6YfnOA(F$>_JghFPCNc53IE}W)s)P5#N*S?dNRoqzF2AvH@7${mD)K=d17C z&hybf_JiO{#@gM7+2f(`ai*|R>c1Il|5!TCzjR&qp9&#P&aQbBRwIr**4BnB^zL%^ z?je`fcQ&qHKlsnS$A63)US=#0eL&(H-+1){U(Q}~q-o0TaDdSUAsmF5jvaQ|=Q~1( zFT1Y$isLw65JDugO=o%JIL`DRE=P00y;9dY%d-5tX