if __name__ == "__main__": import platform, sys, os import xarray as xr import numpy as np import pandas as pd import warnings warnings.filterwarnings('ignore') # plotting modules import hvplot.pandas # noqa import hvplot.xarray # noqa import holoviews as hv from IPython.display import Image from pygmtsar import SBAS import os import rasterio import rioxarray import zipfile import glob from pygmtsar import tqdm_dask # default chunksize (512) is enough suitable for a single subswath processing using resolution 15m # select higher chunk size (1024) to process multiple subswaths and scenes using resolution 15m from pygmtsar import datagrid datagrid.chunksize = 1024 ORBIT = 'D' WORKDIR = 'valeProc' DATADIR = 'sen1_extracted' DEMFILE = None BASEDAYS = 100 BASEMETERS = 150 new = False import dask, dask.distributed # increase timeouts to work using default chunk size (512) even on large areas dask.config.set({'distributed.comm.timeouts.tcp': '60s'}) #print (dask.config.get("distributed.comm.timeouts.tcp")) dask.config.set({'distributed.comm.timeouts.connect': '60s'}) #print (dask.config.get("distributed.comm.timeouts.connect")) from dask.distributed import Client, LocalCluster cluster = LocalCluster(n_workers=1) client = Client(cluster) #MASTER = '2018-02-15' #sbas = SBAS(DATADIR, DEMFILE, basedir=WORKDIR).set_master(MASTER) # establish key variables tested in if/pass interrogators pairs = None sbas = None unwraps = None phasefilts = None disps = None unwraps_detrend = None pickle_file = WORKDIR + "/SBAS.pickle" if os.path.exists(pickle_file) == True: sbas = SBAS.restore(WORKDIR) pairs = sbas.find_pairs() else: sbas = SBAS(DATADIR, DEMFILE, basedir=WORKDIR) sbas.to_dataframe() sbas.download_orbits() sbas.download_dem(backend='GMT', resolution_meters=90) sbas.stack_parallel(n_jobs=4) baseline_pairs = sbas.baseline_pairs(days=BASEDAYS, meters=BASEMETERS) sbas.topo_ra_parallel() topo_ra = sbas.get_topo_ra() pairs = baseline_pairs[['ref_date', 'rep_date']] # SRTM3 DEM resolution 3 sec, 90 m but use 60 m instead to follow NetCDF chunks size 512x512 decimator = sbas.pixel_decimator(resolution_meters=60, debug=True) # default parameters: wavelength=200, psize=32, func=None (no postprocessing) sbas.intf_parallel(pairs, func=decimator, n_jobs=8) # n jobs limited by bug in GMTSAR sbas.merge_parallel(pairs, n_jobs=1) sbas.to_dataframe() # dump current state into WORKDIR sbas.dump() new = True if new == True: sbas = SBAS.restore(WORKDIR) pairs = sbas.find_pairs() sbas.geocode_parallel(pairs) phasefilts = sbas.open_grids(pairs, 'phasefilt') phasefilts_ll = sbas.open_grids(pairs, 'phasefilt', geocode=True) corrs = sbas.open_grids(pairs, 'corr') CORRLIMIT = 0.075 #unwrap interferrograms print('UNWRAP INTERFERROGRAMS IF REQUIRED...') # fill NODATA gaps for valid pixels only interpolator = lambda corr, unwrap: sbas.nearest_grid(unwrap).where(corr>=0) # count merged outputs to avoid reprocessing phase_list = glob.glob(WORKDIR+'/F123*phasefilt.grd') corr_list = glob.glob(WORKDIR+'/F123*corr.grd') unwrap_list = glob.glob(WORKDIR+'/F123*unwrap.grd') detrend_list = glob.glob(WORKDIR+'/F123*detrend.grd') if len(corr_list) != len(unwrap_list): sbas.unwrap_parallel(pairs, threshold=CORRLIMIT, func=interpolator, n_jobs=6) unwraps = sbas.open_grids(pairs, 'unwrap', n_jobs=6) else: pass # calculate displacement (mm)ls print("DETREND UNWRAPPED INTERFERROGRAMS IF REQUIRED...") if len(corr_list) != len(detrend_list): sbas.detrend_parallel(pairs, wavelength=12000, n_jobs=6) unwraps_detrend = sbas.open_grids(pairs, 'detrend', n_jobs=2) else: pass # list displacement grd files print("PROCESS SBAS TIME SERIES IF REQUIRED...") disp_list = glob.glob(WORKDIR+'/F123_disp*.grd') dates = np.unique(pairs.values.flatten() if isinstance(pairs, pd.DataFrame) else pairs.flatten()) if len(disp_list) != len(dates): sbas.sbas_parallel(pairs, n_jobs=1) else: pass valid_area = corrs.min('pair') def interp(da): return sbas.nearest_grid(da).where(~np.isnan(valid_area)) # use wrapper "interp" instead of "sbas.nearest_grid" for post-processing functions list below print('EXPORT DISPLACEMENT') #sbas_disps = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm], n_jobs=2) #sbas_disps_total_ll = sbas.intf_ra2ll(sbas_disps.cumsum('date')) disps = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm, interp], n_jobs=2) disps_ll = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm, interp], geocode=True, n_jobs=2) # clean 1st zero-filled displacement map for better visualization disps_ll[0] = np.nan # calculate cumulative displacement like to GMTSAR SBAS disps_cum = disps_ll.cumsum('date') # clean 1st zero-filled displacement map for better visualization disps_cum[0] = np.nan # create directory for plots displacement_dir = WORKDIR+"_displacement" check_dir = os.path.exists(displacement_dir) if check_dir == False: os.mkdir(displacement_dir) # iterate through all displacement dates and export nc for each count = 1 for i in disps_cum: print(i) filename_nc = f'{displacement_dir}/{WORKDIR}_{count}_cum_disp.nc' filename_tif = f'{displacement_dir}/{WORKDIR}_{count}_cum_disp.tif' delayed = i.to_netcdf(filename_nc, engine=sbas.engine, compute=False) tqdm_dask(dask.persist(delayed), desc='Saving Total Displacement as NetCDF') disps_netcdf = xr.open_dataset(filename_nc) disps_netcdf.rio.write_crs( 4326, inplace=True, ).rio.set_spatial_dims( x_dim="lon", y_dim="lat", inplace=True, ).rio.write_coordinate_system(inplace=True) disps_netcdf["__xarray_dataarray_variable__"].rio.to_raster(filename_tif) disps_netcdf.close() count = count + 1 exit()