# Land relief parameterization in coarser resolution 

This script is used to generate the fine resolution land relief parameters. The data would be cropped into tiles in 6 continents, Africa, Asia, Europe, South America, and North America. Each tile would be extended to have overlap to prevent border effect. The tiles has been generated by `002.generate_tiles.ipynb`, and stored in `equi7_tiles`. 

Each tile runs in its individual process with a stream of parameterization. A landmask is used to mask out the ocean pixels. 

## Part 1: Set up

In [2]:
import os
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd
import numpy as np
from shapely.geometry import Polygon,mapping,box
from shapely import segmentize
import time
import sys
from joblib import Parallel, delayed
from minio import Minio
from eumap.misc import ttprint
import requests
import pandas as pd
import pickle
# set up whiteboxworkflow environment
import whitebox_workflows
from whitebox_workflows import download_sample_data, show, WbEnvironment
wbe = whitebox_workflows.WbEnvironment()

In [5]:
## scale factors for each parameters
p_table=pd.read_csv('scaling.csv')
p_table.head()

Unnamed: 0,parameters,old_data_type,ori_min,ori_max,multiplier,new_data_type,final_min,final_max,no_data
0,geomorphon,Int16,1.0,10.0,1,Byte,1.0,10.0,255
1,hillshade,Float32,0.0,28357.0,1,UInt16,0.0,28357.0,65535
2,slope.in.degree,Float32,0.0,15.29,100,UInt16,0.0,1529.0,65535
3,ls.factor,Float32,0.0,13.0,1000,UInt16,0.0,13000.0,65535
4,pro.curv,Float32,-8.210329,8.01173,1000,Int16,-8210.329056,8011.730194,32767


In [9]:
#os.system(f'rm -r tmp-global-geomorpho/*')
with open('shuf.txt', 'r') as file:
    shuf = [int(line.strip()) for line in file]
    
with open(f'equi7_tiles', "rb") as fp:   # Unpickling
    args_whole = pickle.load(fp)
    
start_tile=0
end_tile=790
args = args_whole[start_tile:end_tile]

In [10]:
len(args)

790

## Part 2: parametrization by tiles

The parametrization using pyramid representation. The steps include:

1. create local tile and convert DTM value from decimeter to meter and save it as float
2. crop the landmask to the given tile.
3. apply guassian filter at 30m 
4. apply paramtrization 
5. crop and save the parameters raster into local tile
6. push to back the S3 server

In [None]:
for info in args:
    equi7_bounds_final,equi7_bounds_rls,epsg4326_bounds_rls,equi7_bounds_ls,epsg4326_bounds_ls,tile_name,equi_7_proj = info[0],info[1],info[2],info[3],info[4],info[5],info[6]


    outdir=f'tmp-global-geomorpho/{tile_name}'
    os.makedirs(outdir,exist_ok=True)
    gdalwarp_bbox_rls = ' '.join([str(i) for i in equi7_bounds_rls])
    gdalwarp_bbox_ls = ' '.join([str(i) for i in equi7_bounds_ls])
    gdalwarp_bbox_final = ' '.join([str(i) for i in equi7_bounds_final])
    filepath = 'legendtm_rf_30m_m_s_20000101_20231231_go_epsg.4326_v20250130.tif'

    start_time = time.time()
    tmp_outdir=f'/tmp/tmp-global-geomorpho/{tile_name}'
    os.makedirs(tmp_outdir,exist_ok=True)
    gdal_cmd = f'gdalwarp  -co TILED=YES -co BIGTIFF=YES -co COMPRESS=DEFLATE \
    -co ZLEVEL=9 -co BLOCKXSIZE=1024 -co BLOCKYSIZE=1024 -co NUM_THREADS=8 \
    -co SPARSE_OK=TRUE -of GTiff -overwrite'
    out_ori_file=f'dtm_edtm_m_30m_s_20000101_20221231_{tile_name.lower().replace("_",".")}.rls_equi7_v20241230.tif'
    url_rls=f'{ip}://tmp-global-geomorpho/{tile_name}/{out_ori_file}'

    r = requests.head(url_rls)
    if r.status_code == 200:
        ttprint(f'{url_rls} exists')
    else:
        rn_file = f'{tmp_outdir}/{out_ori_file}'
        os.system(f'{gdal_cmd} -t_srs "{equi_7_proj}" \
        -te {gdalwarp_bbox_rls} -tr 30 30 -r bilinear {filepath} {tmp_outdir}/scaled_dtm_tmp_rls.tif')
        os.system(f'gdal_calc.py --overwrite -A {tmp_outdir}/scaled_dtm_tmp_rls.tif \
        --outfile={rn_file} --calc="A * 0.1" \
        --type=Float32 --co="COMPRESS=DEFLATE" --co="BLOCKXSIZE=2048" --co="BLOCKYSIZE=2048"')
        
    for resolution in [30,60,120,240]:
        url=f'{ip}://tmp-global-geomorpho/{tile_name}/tan.curv_edtm_m_{resolution}m_s_20000101_20221231_go_epsg.4326_v20241230.tif'
        r = requests.head(url)
        if r.status_code == 200:
            ttprint(f'{tile_name} has been process')
            return

        if resolution==30:
            tmp_dtm_rls_file = f'{tmp_outdir}/dtm_tmp_rls_{resolution}.tif'
            os.system(f'{gdal_cmd} /vsicurl/{url_rls} {tmp_dtm_rls_file}')
            # crop to local land surface tiff
            tmp_dtm_ls_file = f'{tmp_outdir}/dtm_tmp_ls.tif'
            os.system(f'{gdal_cmd} -te {gdalwarp_bbox_ls} /vsicurl/{url_rls} {tmp_dtm_ls_file}')

        else:            
            # crop to regional land surface tiff
            tmp_dtm_rls_file = f'{tmp_outdir}/dtm_tmp_rls_{resolution}.tif'
            os.system(f'{gdal_cmd} -r average -tr {resolution} {resolution} -te {gdalwarp_bbox_rls} /vsicurl/{url_rls} {tmp_dtm_rls_file}')
     
            # crop to local land surface tiff
            tmp_dtm_ls_file = f'{tmp_outdir}/dtm_tmp_ls_{resolution}.tif'
            os.system(f'{gdal_cmd} -r average -tr {resolution} {resolution} -te {gdalwarp_bbox_ls} /vsicurl/{url_rls} {tmp_dtm_ls_file}')
    
        # crop the landmask
        global_landmask_file='{ip}://global/dsm.landmask_ensemble_m_30m_s_20000101_20221231_go_epsg.4326_v4.1.tif'
        tmp_landmask_file = f'{tmp_outdir}/landmask_{resolution}.tif'
        os.system(f'{gdal_cmd} -t_srs "{equi_7_proj}" -r min -tr {resolution} {resolution} -te {gdalwarp_bbox_final} {global_landmask_file} {tmp_landmask_file}')


        start_time = time.time()
        # Reading raster data
        dtm = wbe.read_raster(tmp_dtm_rls_file)
        ttprint(f"{tile_name} read_raster--- %s seconds ---" % (time.time() - start_time))

        file_list=[]
        if resolution == 30:
            start_time = time.time()
            dtm = wbe.gaussian_filter(dtm)
            ttprint(f"{tile_name} calculate gaussian filter--- %s seconds ---" % (time.time() - start_time))    


        # geomorphon
        #tmp_geomorphon_file=tmp_dtm_rls_file.replace('dtm','geomorphon')
        #scale=p_table[p_table['parameters']=='geomorphon'].multiplier.iloc[0]

        #start_time = time.time()
        #geomorphon=wbe.geomorphons(dtm, search_distance=3, 
        #                          output_forms=True, analyze_residuals=False)
        #wbe.write_raster(geomorphon*scale, tmp_geomorphon_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        #ttprint(f"{tile_name} calculate geomporphon--- %s seconds ---" % (time.time() - start_time))    
        #file_list.append(tmp_geomorphon_file)


        # fill depression for hydrological analysis
        tmp_flled_dtm_file=tmp_dtm_rls_file.replace('dtm','nodepress.dtm')
        scale=p_table[p_table['parameters']=='nodepress.dtm'].multiplier.iloc[0]

        start_time = time.time()
        dtm_no_deps = wbe.breach_depressions_least_cost(dtm, fill_deps=True, flat_increment=0.001)
        wbe.write_raster(dtm_no_deps*scale, tmp_flled_dtm_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} fill depressions--- %s seconds ---" % (time.time() - start_time))    
        file_list.append(tmp_flled_dtm_file)


        # slope for hydrology
        tmp_slope_file=tmp_dtm_rls_file.replace('dtm','slope.in.degree')
        scale=p_table[p_table['parameters']=='slope.in.degree'].multiplier.iloc[0]

        start_time = time.time()
        slope_in_degree = wbe.slope(dtm)
        wbe.write_raster(slope_in_degree*scale, tmp_slope_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} calculate slope--- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_slope_file)

        # SCA
        tmp_sca_file=tmp_dtm_rls_file.replace('dtm','spec.catch')
        scale=p_table[p_table['parameters']=='spec.catch'].multiplier.iloc[0]

        start_time = time.time()
        sca = wbe.qin_flow_accumulation(dtm_no_deps, out_type='sca', log_transform=True)
        wbe.write_raster(sca*scale, tmp_sca_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} specific catchment area--- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_sca_file)

        # ls factor
        tmp_lsfactor_file=tmp_dtm_rls_file.replace('dtm','ls.factor')
        start_time = time.time()
        scale=p_table[p_table['parameters']=='ls.factor'].multiplier.iloc[0]

        ls_factor=wbe.sediment_transport_index(sca, slope_in_degree, sca_exponent=0.4, slope_exponent=1.3)
        wbe.write_raster(ls_factor*scale, tmp_lsfactor_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} calculate ls factor--- %s seconds ---" % (time.time() - start_time))    
        file_list.append(tmp_lsfactor_file)

        #twi
        start_time = time.time()
        tmp_twi_file=tmp_dtm_rls_file.replace('dtm','twi')
        scale=p_table[p_table['parameters']=='twi'].multiplier.iloc[0]

        twi = wbe.wetness_index(specific_catchment_area=sca, slope=slope_in_degree)
        #twi_filled = wbe.fill_missing_data(twi, exclude_edge_nodata=True)
        ttprint(f"{tile_name} topographic wetness index--- %s seconds ---" % (time.time() - start_time))
        wbe.write_raster(twi*scale, tmp_twi_file, compress=True) # Compression is good, but it is a bit slower so here we won't use it.
        file_list.append(tmp_twi_file)

        # Reading raster data
        start_time = time.time()
        # Reading raster data
        dtm = wbe.read_raster(tmp_dtm_ls_file)
        ttprint(f"{tile_name} read local surface raster--- %s seconds ---" % (time.time() - start_time))

        # diff from mean elev
        start_time = time.time()
        tmp_dfme_file=tmp_dtm_ls_file.replace('dtm','dfme')
        scale=p_table[p_table['parameters']=='dfme'].multiplier.iloc[0]

        dfme=wbe.difference_from_mean_elevation(
            dtm, 
            filter_size_x=3, 
            filter_size_y=3)

        wbe.write_raster(dfme*scale, tmp_dfme_file, compress=True) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} diff from mean elev--- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_dfme_file)

        #Spherical Std Dev Of Normals
        start_time = time.time()
        tmp_ssdon_file=tmp_dtm_ls_file.replace('dtm','ssdon')
        scale=p_table[p_table['parameters']=='ssdon'].multiplier.iloc[0]

        start_time = time.time()
        ssdon=wbe.spherical_std_dev_of_normals(
            dtm, 
            filter_size=3 
        )

        wbe.write_raster(ssdon*scale, tmp_ssdon_file, compress=True) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} spherical std dev of normals--- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_ssdon_file)

        if resolution == 30:
            start_time = time.time()
            dtm_g = wbe.gaussian_filter(dtm)
            ttprint(f"{tile_name} calculate gaussian filter--- %s seconds ---" % (time.time() - start_time))
            tmp_dtm_gaus_file=tmp_dtm_rls_file.replace('dtm','filtered.dtm')
            scale=p_table[p_table['parameters']=='filtered.dtm'].multiplier.iloc[0]
            wbe.write_raster(dtm_g*scale, tmp_dtm_gaus_file, compress=True)#, compress=False) # Compression is good, but it 
            del dtm
            dtm = wbe.read_raster(tmp_dtm_gaus_file)
            file_list.append(tmp_dtm_gaus_file)

        # Hillshade
        tmp_hillshade_file=tmp_dtm_ls_file.replace('dtm','hillshade')
        scale=p_table[p_table['parameters']=='hillshade'].multiplier.iloc[0]

        start_time = time.time()
        hs = wbe.multidirectional_hillshade(dtm)
        wbe.write_raster(hs*scale, tmp_hillshade_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} calculate hillshade--- %s seconds ---" % (time.time() - start_time))    
        file_list.append(tmp_hillshade_file)

        # Minic
        tmp_minic_file=tmp_dtm_ls_file.replace('dtm','minic')
        scale=p_table[p_table['parameters']=='minic'].multiplier.iloc[0]

        start_time = time.time()
        minic = wbe.minimal_curvature(dtm, log_transform=True)
        wbe.write_raster(minic*scale, tmp_minic_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} calculate minic--- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_minic_file)


        # Maxic
        tmp_maxic_file=tmp_dtm_ls_file.replace('dtm','maxic')
        scale=p_table[p_table['parameters']=='maxic'].multiplier.iloc[0]

        start_time = time.time()
        maxic = wbe.maximal_curvature(dtm, log_transform=True)
        wbe.write_raster(maxic*scale, tmp_maxic_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} calculate maxic--- %s seconds ---" % (time.time() - start_time))    
        file_list.append(tmp_maxic_file)

        # Openness
        tmp_pos_file=tmp_dtm_ls_file.replace('dtm','pos.openness')
        tmp_neg_file=tmp_dtm_ls_file.replace('dtm','neg.openness')
        start_time = time.time()
        pos,neg = wbe.openness(dtm,dist=3)
        scale=p_table[p_table['parameters']=='pos.openness'].multiplier.iloc[0]
        wbe.write_raster(pos*scale, tmp_pos_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.
        scale=p_table[p_table['parameters']=='neg.openness'].multiplier.iloc[0]

        wbe.write_raster(neg*scale, tmp_neg_file, compress=True)#, compress=False) # Compression is good, but it is a bit slower so here we won't use it.

        ttprint(f"{tile_name} calculate openness--- %s seconds ---" % (time.time() - start_time))    
        file_list.append(tmp_pos_file)
        file_list.append(tmp_neg_file)


        # profile curve
        start_time = time.time()
        tmp_procurv_file=tmp_dtm_ls_file.replace('dtm','pro.curv')
        scale=p_table[p_table['parameters']=='pro.curv'].multiplier.iloc[0]
        procurv = wbe.profile_curvature(dtm, log_transform=True)
        wbe.write_raster(procurv*scale, tmp_procurv_file, compress=True) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} profile curve --- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_procurv_file)


        # shape index
        start_time = time.time()
        tmp_shpindx_file=tmp_dtm_ls_file.replace('dtm','shpindx')
        scale=p_table[p_table['parameters']=='shpindx'].multiplier.iloc[0]
        shpindx=wbe.shape_index(dtm)
        wbe.write_raster(shpindx*scale, tmp_shpindx_file, compress=True)
        ttprint(f"{tile_name} shape index --- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_shpindx_file)

        # ring curvature
        start_time = time.time()
        tmp_ring_curv_file=tmp_dtm_ls_file.replace('dtm','ring.curv')
        scale=p_table[p_table['parameters']=='ring.curv'].multiplier.iloc[0]
        ring_curv=wbe.ring_curvature(dtm, log_transform=True)

        ttprint(f"{tile_name} ring curvature --- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_ring_curv_file)

        # tangential curvatures
        start_time = time.time()
        tmp_tan_curv_file=tmp_dtm_ls_file.replace('dtm','tan.curv')
        scale=p_table[p_table['parameters']=='tan.curv'].multiplier.iloc[0]

        tan_curv=wbe.tangential_curvature(dtm, log_transform=True)
        wbe.write_raster(tan_curv*scale, tmp_tan_curv_file, compress=True) # Compression is good, but it is a bit slower so here we won't use it.
        ttprint(f"{tile_name} tangential curvature --- %s seconds ---" % (time.time() - start_time))
        file_list.append(tmp_tan_curv_file)


        start_time = time.time()
        def para_gdal_warp(file_path,tile_name,bbox,p_table,tmp_landmask_file):
            file_name = file_path.split('/')[-1]
            parameter = file_name.split('_')[0]
            dtype=p_table[p_table['parameters']==parameter].new_data_type.iloc[0]
            no_data=p_table[p_table['parameters']==parameter].no_data.iloc[0]

            gdalcmd = f'gdalwarp -overwrite -ot {dtype} -tr {resolution} {resolution} -te {bbox} -co TILED=YES -co BIGTIFF=YES -co COMPRESS=DEFLATE -co ZLEVEL=9 -co BLOCKXSIZE=2048 -co BLOCKYSIZE=2048 -co NUM_THREADS=8 -co SPARSE_OK=TRUE'

            file_name = parameter + '_edtm' + '_m' + f'_{resolution}m' + '_s' + '_20000101_20221231' + '_go'  + '_epsg.4326' + '_v20241230' + '.tif'
            out_path = f'{outdir}/{file_name}'
            tmp_out_path = f'{outdir}/tmp_{file_name}'
            os.system(f'{gdalcmd} {file_path} {tmp_out_path}')
            # landmasking
            os.system(f'gdal_calc.py -A {tmp_out_path} -B {tmp_landmask_file} --overwrite --outfile={out_path} \
                        --calc="(B==100)*A + (B!=100)*{no_data}" --type={dtype} --co="ZLEVEL=9" --co="COMPRESS=DEFLATE" \
                        --co="BLOCKXSIZE=2048" --NoDataValue={no_data} --co="BLOCKYSIZE=2048" \
                        --co="NUM_THREADS=8" --co="SPARSE_OK=TRUE"')
            os.remove(file_path)
            return out_path,file_name

        args = [(i,tile_name,gdalwarp_bbox_final,p_table,tmp_landmask_file) for i in file_list]
        for arg in args:
            out_file,rn_file=para_gdal_warp(arg[0],arg[1],arg[2],arg[3],arg[4])
            s3_path = f"{tile_name}/{rn_file}"
            client.fput_object(s3_config['bucket'], s3_path, out_file)
            os.remove(out_file)
            ttprint(f'{ip}://tmp-global-geomorpho/{s3_path} on S3')
        os.remove(tmp_dtm_ls_file)
        os.remove(tmp_dtm_rls_file)
        os.system(f'rm -r {tmp_outdir}/*')
        ttprint(f"{tile_name} crop and save to local--- %s seconds ---" % (time.time() - start_time))

Parallel(n_jobs=10)(delayed(worker)(i,p_table) for i in args)