if __name__ == "__main__": import platform, sys, os import xarray as xr import numpy as np import pandas as pd import warnings warnings.filterwarnings('ignore') # plotting modules import hvplot.pandas # noqa import hvplot.xarray # noqa import holoviews as hv from IPython.display import Image from pygmtsar import SBAS import os import rasterio import rioxarray import zipfile import glob from pygmtsar import tqdm_dask # default chunksize (512) is enough suitable for a single subswath processing using resolution 15m # select higher chunk size (1024) to process multiple subswaths and scenes using resolution 15m from pygmtsar import datagrid datagrid.chunksize = 1024 ORBIT = 'D' WORKDIR = 'valeProc' DATADIR = 'test' #DEMFILE = 'dem/brum_dem.nc' DEMFILE = None BASEDAYS = 100 BASEMETERS = 150 RESOLUTION = 30 #RESOLUTION = 30 new = False import dask, dask.distributed # increase timeouts to work using default chunk size (512) even on large areas dask.config.set({'distributed.comm.timeouts.tcp': '60s'}) #print (dask.config.get("distributed.comm.timeouts.tcp")) dask.config.set({'distributed.comm.timeouts.connect': '60s'}) #print (dask.config.get("distributed.comm.timeouts.connect")) from dask.distributed import Client, LocalCluster cluster = LocalCluster(n_workers=1) client = Client(cluster) #MASTER = '2018-02-15' #sbas = SBAS(DATADIR, DEMFILE, basedir=WORKDIR).set_master(MASTER) # establish key variables tested in if/pass interrogators pairs = None sbas = None unwraps = None phasefilts = None disps = None unwraps_detrend = None pickle_file = WORKDIR + "/SBAS.pickle" if os.path.exists(pickle_file) == True: sbas = SBAS.restore(WORKDIR) pairs = sbas.find_pairs() else: sbas = SBAS(DATADIR, DEMFILE, basedir=WORKDIR) sbas.to_dataframe() sbas.download_orbits() sbas.download_dem(backend='GMT', resolution_meters=RESOLUTION) topo_ll = sbas.get_dem() print('DEM IS: ', topo_ll) print('stack parallel') sbas.stack_parallel(n_jobs=4) print('generate baseline pairs') baseline_pairs = sbas.baseline_pairs(days=BASEDAYS, meters=BASEMETERS) print('convert DEM to radar coodinates') sbas.topo_ra_parallel() topo_ra = sbas.get_topo_ra() pairs = baseline_pairs[['ref_date', 'rep_date']] # SRTM3 DEM resolution 3 sec, 90 m but use 60 m instead to follow NetCDF chunks size 512x512 decimator = sbas.pixel_decimator(resolution_meters=RESOLUTION, debug=True) # default parameters: wavelength=200, psize=32, func=None (no postprocessing) sbas.intf_parallel(pairs, func=decimator, n_jobs=8) # n jobs limited by bug in GMTSAR sbas.merge_parallel(pairs, n_jobs=1) sbas.to_dataframe() # dump current state into WORKDIR sbas.dump() new = True if new == True: sbas = SBAS.restore(WORKDIR) pairs = sbas.find_pairs() sbas.geocode_parallel(pairs) phasefilts = sbas.open_grids(pairs, 'phasefilt') phasefilts_ll = sbas.open_grids(pairs, 'phasefilt', geocode=True) corrs = sbas.open_grids(pairs, 'corr') CORRLIMIT = 0.075 CORRSTACKLIMIT = 0.15 #unwrap interferrograms print('UNWRAP INTERFERROGRAMS IF REQUIRED...') # fill NODATA gaps for valid pixels only interpolator = lambda corr, unwrap: sbas.nearest_grid(unwrap).where(corr>=0) # count merged outputs to avoid reprocessing phase_list = glob.glob(WORKDIR+'/F123*phasefilt.grd') corr_list = glob.glob(WORKDIR+'/F123*corr.grd') unwrap_list = glob.glob(WORKDIR+'/F123*unwrap.grd') detrend_list = glob.glob(WORKDIR+'/F123*detrend.grd') if len(corr_list) != len(unwrap_list): # create mask for low-coherence and water areas corr_stack = corrs.mean('pair') corr_mask = xr.where(corr_stack >= CORRSTACKLIMIT,1,0).astype(bool).persist() tqdm_dask(corr_mask, desc='Build Correlation Stack Coherency Binary Mask') sbas.download_landmask() landmask = sbas.get_landmask(inverse_geocode=True).astype(bool).compute() composite_mask = corr_mask & landmask # unwrap one pair to tune snaphu config (recieved error about no file being found - unwrap incomplete - see mobigroup/gmtsar #31) #phase = sbas.open_grids(pairs[:1], 'phasefilt')[0] #corr = sbas.open_grids(pairs[:1], 'corr')[0] # # # use specific snaphu configuration to avoid using too much ram conf = sbas.PRM().snaphu_config(NTILEROW=4, NTILECOL=4, ROWOVRLP=200, COLOVRLP=200) print(conf) #sbas.unwrap(pairs[0], mask=composite_mask, conf=conf, conncomp=True, interactive=False, debug=True) sbas.unwrap_parallel(mask=composite_mask, conf=conf, n_jobs=1) #sbas.unwrap_parallel(pairs, threshold=CORRLIMIT, func=interpolator, n_jobs=2) unwraps = sbas.open_grids(pairs, 'unwrap', n_jobs=2) else: pass # calculate displacement (mm)ls print("DETREND UNWRAPPED INTERFERROGRAMS IF REQUIRED...") if len(corr_list) != len(detrend_list): sbas.detrend_parallel(pairs, wavelength=12000, n_jobs=2) unwraps_detrend = sbas.open_grids(pairs, 'detrend', n_jobs=2) else: pass # list displacement grd files print("PROCESS SBAS TIME SERIES IF REQUIRED...") disp_list = glob.glob(WORKDIR+'/F123_disp*.grd') dates = np.unique(pairs.values.flatten() if isinstance(pairs, pd.DataFrame) else pairs.flatten()) if len(disp_list) != len(dates): sbas.sbas_parallel(pairs, n_jobs=2) else: pass valid_area = corrs.min('pair') def interp(da): return sbas.nearest_grid(da).where(~np.isnan(valid_area)) # use wrapper "interp" instead of "sbas.nearest_grid" for post-processing functions list below print('EXPORT DISPLACEMENT') sbas_disps = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm], n_jobs=2) sbas_disps_total_ll = sbas.intf_ra2ll(sbas_disps.cumsum('date')) #disps = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm, interp], n_jobs=2) #disps_ll = sbas.open_grids(dates, 'disp', func=[sbas.los_displacement_mm, interp], geocode=True, n_jobs=2) # clean 1st zero-filled displacement map for better visualization #disps_ll[0] = np.nan # calculate cumulative displacement like to GMTSAR SBAS #disps_cum = disps_ll.cumsum('date') # clean 1st zero-filled displacement map for better visualization #disps_cum[0] = np.nan # create directory for plots displacement_dir = WORKDIR+"_displacement" check_dir = os.path.exists(displacement_dir) if check_dir == False: os.mkdir(displacement_dir) #count = 0 print('EXPORT NETCDF') print(sbas_disps_total_ll) filename_nc = f'{displacement_dir}/{WORKDIR}_cum_disp_30m.nc' filename_tif = f'{displacement_dir}/{WORKDIR}_cum_disp_30m.tif' delayed = sbas_disps_total_ll.to_netcdf(filename_nc, engine=sbas.engine, compute=False) tqdm_dask(dask.persist(delayed), desc='Saving Total Displacement as NetCDF') disps_netcdf = xr.open_dataset(filename_nc) print("EXPORT OUTPUT TO TIF...") print(list(disps_netcdf.keys())) disps_netcdf.rio.write_crs( 4326, inplace=True, ).rio.set_spatial_dims( x_dim="lon", y_dim="lat", inplace=True, ).rio.write_coordinate_system(inplace=True) disps_netcdf["los"].rio.to_raster(filename_tif) disps_netcdf.close() #for i in sbas_disps_total_ll: # print(i) # print('EXPORT NETCDF') # filename_nc = f'{displacement_dir}/{WORKDIR}_cum_disp_30m_{count}.nc' # filename_tif = f'{displacement_dir}/{WORKDIR}_cum_disp_30m_{count}.tif' # delayed = i.to_netcdf(filename_nc, engine=sbas.engine, compute=False) # tqdm_dask(dask.persist(delayed), desc='Saving Total Displacement as NetCDF') # disps_netcdf = xr.open_dataset(filename_nc) # print("EXPORT OUTPUT TO TIF...") # print(list(disps_netcdf.keys())) # disps_netcdf.rio.write_crs( # 4326, # inplace=True, # ).rio.set_spatial_dims( # x_dim="lon", # y_dim="lat", # inplace=True, # ).rio.write_coordinate_system(inplace=True) # disps_netcdf["los"].rio.to_raster(filename_tif) # disps_netcdf.close() # count = count + 1 exit()