In [2]:
import numpy as np
import scipy.linalg as spla
import dask.array as da
import dask
from dask.distributed import Client, LocalCluster
import verde as vd

In [3]:
client = Client(
    LocalCluster(n_workers=2, threads_per_worker=6, processes=False)
)
client

0,1
Client  Scheduler: inproc://138.253.113.172/456/1  Dashboard: http://138.253.113.172/456/1:8787/status,Cluster  Workers: 2  Cores: 12  Memory: 134.91 GB


In [5]:
data = vd.datasets.fetch_baja_bathymetry().iloc[::30, :].reset_index()
coordinates = (data.longitude, data.latitude)
d = data.bathymetry_m.values
data

Unnamed: 0,index,longitude,latitude,bathymetry_m
0,0,245.00891,27.49555,-636.0
1,30,245.13232,27.37844,-329.0
2,60,245.24715,27.25225,-490.0
3,90,245.37160,27.11603,-793.0
4,120,245.48480,26.99675,-1043.0
...,...,...,...,...
2761,82830,245.95190,23.59457,-3776.0
2762,82860,245.72664,23.80416,-4492.0
2763,82890,245.50449,24.00772,-3738.0
2764,82920,245.29460,24.19820,-3643.0


In [7]:
def scale(x):
    scale_ = np.nanstd(x, axis=0)
    return x/scale_

In [57]:
%%time
A = scale(vd.Spline(mindist=100e3).jacobian(coordinates, coordinates))
p = spla.lstsq(A.T@A, A.T@d)[0]

CPU times: user 9.27 s, sys: 256 ms, total: 9.53 s
Wall time: 9.19 s


In [10]:
chunk = 1000
east, north = data.longitude.values, data.latitude.values
deast, dnorth, dd = [
    da.from_array(i, chunks=(chunk,)) for i in (east, north, d)
]

In [11]:
def dask_green(e, n, md):
    d = da.sqrt(e**2 + n**2) + md
    return d**2 * (da.log(d) - 1)

def jacobian_dask(e, n, pe, pn, md):
    A = dask_green(
        e[np.newaxis].T - pe,
        n[np.newaxis].T - pn,
        md
    )
    return A

def scale_dask(x):
    scale_ = da.nanstd(x, axis=0)
    return x/scale_

In [72]:
%%time
B = scale_dask(jacobian_dask(deast, dnorth, deast, dnorth, 100e3))
hessian = B.T@B
gradient = B.T@dd
#size = hessian.shape[0]
#nc = size//chunk
#hessian = hessian.rechunk({0: -1, 1: 2000}, block_size_limit=1e8)
hessian = hessian.rechunk(chunk)
pa = da.linalg.lstsq(hessian, gradient)[0].compute()
#pa = da.linalg.solve(hessian, gradient).compute()
#pa = spla.lstsq(hessian.compute(), gradient.compute())[0]

NotImplementedError: qr currently supports only tall-and-skinny (single column chunk/block; see tsqr)
and short-and-fat (single row chunk/block; see sfqr) matrices

Consider use of the rechunk method. For example,

x.rechunk({0: -1, 1: 'auto'}) or x.rechunk({0: 'auto', 1: -1})

which rechunk one shorter axis to a single chunk, while allowing
the other axis to automatically grow/shrink appropriately.

In [73]:
hessian

Unnamed: 0,Array,Chunk
Bytes,61.21 MB,8.00 MB
Shape,"(2766, 2766)","(1000, 1000)"
Count,161 Tasks,9 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 61.21 MB 8.00 MB Shape (2766, 2766) (1000, 1000) Count 161 Tasks 9 Chunks Type float64 numpy.ndarray",2766  2766,

Unnamed: 0,Array,Chunk
Bytes,61.21 MB,8.00 MB
Shape,"(2766, 2766)","(1000, 1000)"
Count,161 Tasks,9 Chunks
Type,float64,numpy.ndarray


In [59]:
np.testing.assert_allclose(p, pa)

AssertionError: 
Not equal to tolerance rtol=1e-07, atol=0

Mismatch: 100%
Max absolute difference: 145.98236626
Max relative difference: 98.93263231
 x: array([-744.980257,   31.568263,  -53.567211, ...,  -62.509019,
        -24.610384,  100.793315])
 y: array([-747.50944 ,  124.753735, -162.208116, ...,  -64.290728,
         -0.813837,  133.498033])

In [52]:
np.testing.assert_allclose(A, B.compute())

In [53]:
np.testing.assert_allclose(A.T@A, (B.T@B).compute())

In [54]:
np.testing.assert_allclose(A.T@d, (B.T@dd).compute())

In [55]:
pa = spla.lstsq((B.T@B).compute(), (B.T@dd).compute())[0]
pa

array([ 6164.31018826,  -674.93260682, -1911.65570904, ...,
        -149.02316376,  1300.74815706, -2509.41718547])

In [56]:
p

array([-6340.64337945,   483.08877072,  -989.76398839, ...,
         -17.5917338 ,  1198.04619532, -2540.52025673])