In [3]:
import os
import gc
import re
import urllib.request

import cartopy.crs as ccrs
import matplotlib.pyplot as plt
import numpy as np
from rasterio.warp import transform
import pandas as pd
import xarray as xr
import xesmf as xe

from dask.diagnostics import ProgressBar
from distributed import Client


In [None]:
client = Client()
client

In [24]:
def cast_xy_lonlat(fname):
    da = xr.open_rasterio(fname,chunks={'band':1}).rename("lst").astype('float16')
    # Compute the lon/lat coordinates with rasterio.warp.transform
    ny, nx = len(da['y']), len(da['x'])
    x, y = np.meshgrid(da['x'], da['y'])

    # Rasterio works with 1D arrays
    lon, lat = transform(da.crs, {'init': 'EPSG:4326'},
                         x.flatten(), y.flatten())
    lon = np.asarray(lon).reshape((ny, nx))
    lat = np.asarray(lat).reshape((ny, nx))
    da.coords['lon'] = (('y', 'x'), lon)
    da.coords['lat'] = (('y', 'x'), lat)
    da = da.drop(labels=['x','y'])
    return da


In [31]:
# get the file list of geotifs to mosaic
import fnmatch
flist = []
for file_name in os.listdir("../../../data_general/proc_sif-optim/"):
    if fnmatch.fnmatch(file_name, 'myd11a1_2018-*'):
#         print(file_name)
        flist.append("../../../data_general/proc_sif-optim/"+file_name)
flist.sort()
flist

['../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000000000.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000001280.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000002560.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000003840.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000005120.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000000000-0000006400.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000000000.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000001280.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000002560.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000003840.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000005120.tif',
 '../../../data_general/proc_sif-optim/myd11a1_2018-0000001280-0000006400.tif',
 '../../../data_general/proc_sif-optim/m

In [25]:
d0 = cast_xy_lonlat(flist[0])
d0

Unnamed: 0,Array,Chunk
Bytes,1.11 GiB,3.12 MiB
Shape,"(364, 1280, 1280)","(1, 1280, 1280)"
Count,729 Tasks,364 Chunks
Type,float16,numpy.ndarray
"Array Chunk Bytes 1.11 GiB 3.12 MiB Shape (364, 1280, 1280) (1, 1280, 1280) Count 729 Tasks 364 Chunks Type float16 numpy.ndarray",1280  1280  364,

Unnamed: 0,Array,Chunk
Bytes,1.11 GiB,3.12 MiB
Shape,"(364, 1280, 1280)","(1, 1280, 1280)"
Count,729 Tasks,364 Chunks
Type,float16,numpy.ndarray


In [32]:
d0 = cast_xy_lonlat(flist[0])
d1 = cast_xy_lonlat(flist[1])
d2 = cast_xy_lonlat(flist[2])
d3 = cast_xy_lonlat(flist[3])
d4 = cast_xy_lonlat(flist[4])
d5 = cast_xy_lonlat(flist[5])
out0 = xr.concat([d0,d1,d2,d3,d4,d5],dim='x')
out0 = out0.rename({"band":"time"})
out0.coords['time'] = pd.date_range("2018-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

In [37]:
d0 = cast_xy_lonlat(flist[6])
d1 = cast_xy_lonlat(flist[7])
d2 = cast_xy_lonlat(flist[8])
d3 = cast_xy_lonlat(flist[9])
d4 = cast_xy_lonlat(flist[10])
d5 = cast_xy_lonlat(flist[11])
out1 = xr.concat([d0,d1,d2,d3,d4,d5],dim='x')
out1 = out1.rename({"band":"time"})
out1.coords['time'] = pd.date_range("2018-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

In [38]:
d0 = cast_xy_lonlat(flist[12])
d1 = cast_xy_lonlat(flist[13])
d2 = cast_xy_lonlat(flist[14])
d3 = cast_xy_lonlat(flist[15])
d4 = cast_xy_lonlat(flist[16])
d5 = cast_xy_lonlat(flist[17])
out2 = xr.concat([d0,d1,d2,d3,d4,d5],dim='x')
out2 = out2.rename({"band":"time"})
out2.coords['time'] = pd.date_range("2018-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

In [None]:
# this 'seems' to work. concatenate along longitude for groups of files specified
# by the first 10-digit GEE chunk block
d1 = xr.open_rasterio(flist[0],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[1],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[2],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[3],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[4],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[5],chunks=-1).rename("lst").astype('float16')
out = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out = out.rename({"band":"time"})
out.coords['time'] = pd.date_range("2018-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

In [42]:
# concat the three horizontal strips along y
out123 = xr.concat([out0,out1,out2],dim='y')
out123.chunk({'time':364})

Unnamed: 0,Array,Chunk
Bytes,12.07 GiB,1.11 GiB
Shape,"(364, 2766, 6434)","(364, 1280, 1280)"
Count,26244 Tasks,18 Chunks
Type,float16,numpy.ndarray
"Array Chunk Bytes 12.07 GiB 1.11 GiB Shape (364, 2766, 6434) (364, 1280, 1280) Count 26244 Tasks 18 Chunks Type float16 numpy.ndarray",6434  2766  364,

Unnamed: 0,Array,Chunk
Bytes,12.07 GiB,1.11 GiB
Shape,"(364, 2766, 6434)","(364, 1280, 1280)"
Count,26244 Tasks,18 Chunks
Type,float16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,135.78 MiB,135.78 MiB
Shape,"(2766, 6434)","(2766, 6434)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 135.78 MiB 135.78 MiB Shape (2766, 6434) (2766, 6434) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",6434  2766,

Unnamed: 0,Array,Chunk
Bytes,135.78 MiB,135.78 MiB
Shape,"(2766, 6434)","(2766, 6434)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,135.78 MiB,135.78 MiB
Shape,"(2766, 6434)","(2766, 6434)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 135.78 MiB 135.78 MiB Shape (2766, 6434) (2766, 6434) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",6434  2766,

Unnamed: 0,Array,Chunk
Bytes,135.78 MiB,135.78 MiB
Shape,"(2766, 6434)","(2766, 6434)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [43]:
client = Client(n_workers=1)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43975 instead


In [45]:
%%time
out123.to_netcdf('../../../data_general/proc_sif-optim/myd_lst/myd11a1_2018.nc',
                 mode='w',
               format='netcdf4',
              encoding = {'lst': {'dtype': 'int16', 
                                  'scale_factor': 0.1, 
                                  '_FillValue': -9999,
                                  'zlib':True}})

KeyboardInterrupt: 

In [None]:
out123

In [None]:
# d1-6 don't seem to be important once they are concatenated into another object
d1 = xr.open_rasterio(flist[6],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[7],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[8],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[9],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[10],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[11],chunks=-1).rename("lst").astype('float16')
out2 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out2 = out2.rename({"band":"time"})
out2.coords['time'] = pd.date_range("2018-01-01",periods=364)

In [None]:
d1 = xr.open_rasterio(flist[12],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[13],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[14],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[15],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[16],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[17],chunks=-1).rename("lst").astype('float16')
out3 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out3 = out3.rename({"band":"time"})
out3.coords['time'] = pd.date_range("2018-01-01",periods=364)

In [None]:
# concat the three horizontal strips along y
out123 = xr.concat([out,out2,out3],dim='y')
out123.chunk({'time':364})

In [None]:
client = Client(n_workers=1)

In [46]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:43975/status,

0,1
Dashboard: http://127.0.0.1:43975/status,Workers: 1
Total threads: 24,Total memory: 125.77 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46411,Workers: 1
Dashboard: http://127.0.0.1:43975/status,Total threads: 24
Started: 26 minutes ago,Total memory: 125.77 GiB

0,1
Comm: tcp://127.0.0.1:40167,Total threads: 24
Dashboard: http://127.0.0.1:43253/status,Memory: 125.77 GiB
Nanny: tcp://127.0.0.1:34249,
Local directory: /home/sami/srifai@gmail.com/work/research/sif-optim/src/python/dask-worker-space/worker-fpzryi2s,Local directory: /home/sami/srifai@gmail.com/work/research/sif-optim/src/python/dask-worker-space/worker-fpzryi2s


In [None]:
out123.to_netcdf('../../../data_general/proc_sif-optim/myd_lst/myd11a1_2018.nc',
               format='netcdf4',
              encoding = {'lst': {'dtype': 'int16', 
                                  'scale_factor': 0.1, 
                                  '_FillValue': -9999,
                                  'zlib':True}})

In [50]:
out123

Unnamed: 0,Array,Chunk
Bytes,12.07 GiB,3.12 MiB
Shape,"(364, 2766, 6434)","(1, 1280, 1280)"
Count,26226 Tasks,6552 Chunks
Type,float16,numpy.ndarray
"Array Chunk Bytes 12.07 GiB 3.12 MiB Shape (364, 2766, 6434) (1, 1280, 1280) Count 26226 Tasks 6552 Chunks Type float16 numpy.ndarray",6434  2766  364,

Unnamed: 0,Array,Chunk
Bytes,12.07 GiB,3.12 MiB
Shape,"(364, 2766, 6434)","(1, 1280, 1280)"
Count,26226 Tasks,6552 Chunks
Type,float16,numpy.ndarray


In [49]:
out2.shape

(364, 206, 6434)

In [None]:
pd.date_range("2018-01-01",periods=364)

In [None]:
d1 = xr.open_rasterio(flist[6]).rename("lst")
d2 = xr.open_rasterio(flist[7]).rename("lst")
d3 = xr.open_rasterio(flist[8]).rename("lst")
d4 = xr.open_rasterio(flist[9]).rename("lst")
d5 = xr.open_rasterio(flist[10]).rename("lst")
d6 = xr.open_rasterio(flist[11]).rename("lst")
out2 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')

In [None]:
d1 = xr.open_rasterio(flist[12]).rename("lst")
d2 = xr.open_rasterio(flist[13]).rename("lst")
d3 = xr.open_rasterio(flist[14]).rename("lst")
d4 = xr.open_rasterio(flist[15]).rename("lst")
d5 = xr.open_rasterio(flist[16]).rename("lst")
d6 = xr.open_rasterio(flist[17]).rename("lst")
out3 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')

In [None]:
out[0,:,:]

In [None]:
# get the file list of geotifs to mosaic
import fnmatch
flist = []
for file_name in os.listdir("../../../data_general/proc_sif-optim/"):
    if fnmatch.fnmatch(file_name, 'myd11a1_2019-*'):
#         print(file_name)
        flist.append("../../../data_general/proc_sif-optim/"+file_name)
flist.sort()

# this 'seems' to work. concatenate along longitude for groups of files specified
# by the first 10-digit GEE chunk block
d1 = xr.open_rasterio(flist[0],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[1],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[2],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[3],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[4],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[5],chunks=-1).rename("lst").astype('float16')
out = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out = out.rename({"band":"time"})
out.coords['time'] = pd.date_range("2019-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

# d1-6 don't seem to be important once they are concatenated into another object
d1 = xr.open_rasterio(flist[6],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[7],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[8],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[9],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[10],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[11],chunks=-1).rename("lst").astype('float16')
out2 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out2 = out2.rename({"band":"time"})
out2.coords['time'] = pd.date_range("2019-01-01",periods=364)

d1 = xr.open_rasterio(flist[12],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[13],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[14],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[15],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[16],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[17],chunks=-1).rename("lst").astype('float16')
out3 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out3 = out3.rename({"band":"time"})
out3.coords['time'] = pd.date_range("2019-01-01",periods=364)

# concat the three horizontal strips along y
out123 = xr.concat([out,out2,out3],dim='y')
out123.chunk({'time':364})
out123

In [None]:
client = Client(n_workers=1)
out123.to_netcdf('../../../data_general/proc_sif-optim/myd_lst/myd11a1_2019.nc',
               format='netcdf4',
              encoding = {'lst': {'dtype': 'int16', 
                                  'scale_factor': 0.1, 
                                  '_FillValue': -9999,
                                  'zlib':True}})

In [None]:
# get the file list of geotifs to mosaic
import fnmatch
flist = []
for file_name in os.listdir("../../../data_general/proc_sif-optim/"):
    if fnmatch.fnmatch(file_name, 'myd11a1_2020-*'):
#         print(file_name)
        flist.append("../../../data_general/proc_sif-optim/"+file_name)
flist.sort()

# this 'seems' to work. concatenate along longitude for groups of files specified
# by the first 10-digit GEE chunk block
d1 = xr.open_rasterio(flist[0],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[1],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[2],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[3],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[4],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[5],chunks=-1).rename("lst").astype('float16')
out = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out = out.rename({"band":"time"})
out.coords['time'] = pd.date_range("2020-01-01",periods=365) # NOTE! 2020 files have 365 days

# d1-6 don't seem to be important once they are concatenated into another object
d1 = xr.open_rasterio(flist[6],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[7],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[8],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[9],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[10],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[11],chunks=-1).rename("lst").astype('float16')
out2 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out2 = out2.rename({"band":"time"})
out2.coords['time'] = pd.date_range("2020-01-01",periods=365)

d1 = xr.open_rasterio(flist[12],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[13],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[14],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[15],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[16],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[17],chunks=-1).rename("lst").astype('float16')
out3 = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out3 = out3.rename({"band":"time"})
out3.coords['time'] = pd.date_range("2020-01-01",periods=365)

# concat the three horizontal strips along y
out123 = xr.concat([out,out2,out3],dim='y')
out123.chunk({'time':365})
out123

client = Client(n_workers=1)
out123.to_netcdf('../../../data_general/proc_sif-optim/myd_lst/myd11a1_2020.nc',
               format='netcdf4',
              encoding = {'lst': {'dtype': 'int16', 
                                  'scale_factor': 0.1, 
                                  '_FillValue': -9999,
                                  'zlib':True}})

In [None]:
# restart client?
client = Client(n_workers=6)
client.has_what

In [None]:
client

In [None]:
ref_grid = xr.open_dataset("../../../data_general/proc_sif-optim/covar_nc/sif_refgrid.nc")['SIF']
da = xr.open_dataset("../../../data_general/proc_sif-optim/myd_lst/myd11a1_2018.nc")

In [None]:
d1 = xr.open_rasterio(flist[0],chunks=-1).rename("lst").astype('float16')
d2 = xr.open_rasterio(flist[1],chunks=-1).rename("lst").astype('float16')
d3 = xr.open_rasterio(flist[2],chunks=-1).rename("lst").astype('float16')
d4 = xr.open_rasterio(flist[3],chunks=-1).rename("lst").astype('float16')
d5 = xr.open_rasterio(flist[4],chunks=-1).rename("lst").astype('float16')
d6 = xr.open_rasterio(flist[5],chunks=-1).rename("lst").astype('float16')
out = xr.concat([d1,d2,d3,d4,d5,d6],dim='x')
out = out.rename({"band":"time"})
out.coords['time'] = pd.date_range("2018-01-01",periods=364) # NOTE! 364 is cuz I dropped a day when exporting from GEE

In [None]:
d1 = out.isel({"time":0})

In [None]:
# Compute the lon/lat coordinates with rasterio.warp.transform
ny, nx = len(da['y']), len(da['x'])
x, y = np.meshgrid(da['x'], da['y'])

# Rasterio works with 1D arrays
lon, lat = transform(d1.crs, {'init': 'EPSG:4326'},
                     x.flatten(), y.flatten())
lon = np.asarray(lon).reshape((ny, nx))
lat = np.asarray(lat).reshape((ny, nx))
da.coords['lon'] = (('y', 'x'), lon)
da.coords['lat'] = (('y', 'x'), lat)


In [None]:
ref_grid

In [None]:
regridder = xe.Regridder(da, ref_grid, 'bilinear')
# regridder.clean_weight_file()
regridder

In [None]:
da_out = regridder(da)

In [None]:
ds = ds.rename({"band_data":'lst'})

In [None]:
t1 = ds['lst'][0,:,:].to_numpy()

In [None]:
plt.imshow(t1)

In [None]:
ds1 = ds['lst'][0:364,:,:]

In [None]:
ds1

In [None]:
client = Client(n_workers=1)

In [None]:
ds1 = ds1.compute()

In [None]:
ds1.to_netcdf('../../../data_general/proc_sif-optim/myd_lst/myd11a1_2018_regrid.nc',
               format='netcdf4',
              encoding = {'lst': {'dtype': 'int16', 'scale_factor': 0.1, '_FillValue': -9999}})

In [None]:
ds1 = xr.open_dataarray("../../../data_general/proc_sif-optim/covar_nc/myd11a1_2018-0000000000-0000000000_regrid.nc",chunks=-1)
ds1

In [None]:
ds2 = xr.open_dataarray("../../../data_general/proc_sif-optim/covar_nc/myd11a1_2019-0000000000-0000000000_regrid.nc",chunks=-1)

In [None]:
ds3 = xr.open_dataarray("../../../data_general/proc_sif-optim/covar_nc/myd11a1_2019-0000000000-0000000000_regrid.nc",chunks=-1)

In [None]:
ds = xr.concat([ds1,ds2,ds3],dim='time')

In [None]:
xr.open_dataarray(flist[0])

In [None]:
import re
y = re.search("(\\d{4})",flist[0])
y = y.group()
vec_time = pd.date_range(y+"-01-01", periods=364)

In [None]:
365*2

In [None]:
test = xr.open_rasterio('../../../data_general/proc_sif-optim/test_export_res.tif')

In [None]:
test['x']

In [None]:
ref = xr.open_dataset("../../../data_general/proc_sif-optim/covar_nc/sif_refgrid.nc")['SIF']
ref

In [None]:
ref['lon'].shape

In [None]:
test['x'].shape

In [None]:
bcompare = np.isin(ref['lon'],test['x'])

In [None]:
ref['lon'][0:100]

In [None]:
test['x'][0:100]

In [None]:
import dask.array as da
x = da.random.random((1000,1000,1000), chunks='16 MiB')
y = (x + x.T) - x.mean(axis=0)
y.sum().compute()

In [None]:
del(x)