## Cholesky QR Decomposition of a Tall and Skinny Matrix
In this notebook we explore a simple, yet numerically fragile, variant of the QR decomposition based on the Cholesky factorization.

**Note:** This is not the direct TSQR method proposed in the article [here](https://arxiv.org/abs/1301.1071). The Cholesky-based approach lacks both accuracy and numerical stability, making it impractical for high-performance environments where accuracy is vital. Nevertheless, we found it interesting to implement as an exercise, exploring its implementation in Dask and benchmarking it despite its inherent numerical issues.

Let $B$ be a symmetric positive definite $n \times n$ matrix. Its Cholesky decomposition is:
$$
B = L L^T
$$
where $L$ is a lower triangular $n \times n$ matrix. Cholesky decomposition is particularly useful when computing the QR decomposition of an $m \times n$ matrix $A$.  

We first build the temporary matrix $T = A^T A$, which is symmetric by construction. If, by QR decomposition:
$$
A = QR
$$
then:
$$
T = A^T A = (QR)^T(QR) = R^T Q^T Q R
$$
Since $Q$ is orthogonal, we have $T = R^T R$. Notice that $R$ is also triangular, meaning that we have effectively obtained the Cholesky decomposition of $T$ in terms of $R$.  

In other words, the Cholesky QR decomposition proceeds as follows:
1. Given $A$, compute the symmetric matrix $T = A^T A$  
2. Apply the Cholesky decomposition: $T = L L^T$  
3. Set $R = L^T$ and compute $Q$ from $A = QR$  

This procedure is sadly known to be numerically unstable and may fail to yield accurate results, particularly for the orthogonal matrix $Q$. However, it is highly parallelizable, making it a convenient testbed for experimenting with Dask.  



In [None]:
# CLUSTER DEPLOYMENT ON CLOUDVENETO
from dask.distributed import Client, SSHCluster

cluster = SSHCluster(
    ["10.67.22.154", "10.67.22.216", "10.67.22.116", "10.67.22.113"],  # 1 VM as scheduler, 3 VMs as hosting workers
    connect_options={"known_hosts": None},
    remote_python="/home/ubuntu/miniconda3/bin/python",
    scheduler_options={"port": 8786, "dashboard_address": ":8797"},
    worker_options={
        "nprocs": 1,       # N. of processess per VM. CloudVeneto's large VMs offers 4-core CPU, but for now we only spawn 1 process per VM
        "nthreads": 1      # N. of threads per process
    }
)

client = Client(cluster)

In [2]:
# check if everything went smoothly
print(client)
print(cluster)

<Client: 'tcp://10.67.22.154:8786' processes=3 threads=3, memory=5.81 GiB>
SSHCluster(SSHCluster, 'tcp://10.67.22.154:8786', workers=3, threads=3, memory=5.81 GiB)


## Naive implementation of the Cholesky Algorithm
Let's start by importing the necessary stuff along with the California housing dataset (we'll use it as a first test).

In [3]:
import dask.array as da
import numpy as np
from sklearn.datasets import fetch_california_housing

# Download California Housing dataset
data = fetch_california_housing(as_frame=True)

# Convert features into Dask Array (it's a matrix).
n_partition = 3        # number of partition in memory. We have 4 VMS (1 master + 3 workers), so let's start with just 3 partitions
length_partition = data.data.shape[0] // n_partition
# We will partition the dataset by rows, so that each chunk has all the features available
X_da = da.from_array(data.data.values, chunks=(length_partition, data.data.shape[1]))

print("Number of Dask partitions:",  X_da.npartitions) 
print("Length of each partition:", length_partition, "rows")
print("Length of the whole dataset:", data.data.shape[0], "rows")

Number of Dask partitions: 3
Length of each partition: 6880 rows
Length of the whole dataset: 20640 rows


In [4]:
# Let's print X_da to verify the correct partitioning
X_da

Unnamed: 0,Array,Chunk
Bytes,1.26 MiB,430.00 kiB
Shape,"(20640, 8)","(6880, 8)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.26 MiB 430.00 kiB Shape (20640, 8) (6880, 8) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",8  20640,

Unnamed: 0,Array,Chunk
Bytes,1.26 MiB,430.00 kiB
Shape,"(20640, 8)","(6880, 8)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Now we'll define the parallel and serial algorithm for the Cholesky QR decomposition.

This first parallel version of the Cholesky method we propose works as follows:
1) The array should already be splitted by rows in partitions across workers (let's call each partition $A_p$). Each worker computes a local version of $A^T A$, i.e. $A_p^T A_p$. Since $A_p$ is smaller than $A$, the matrix multiplication should proceed faster. Furthermore, $A_p$ being smaller may fully reside in of worker's RAM
2) Once each worker has finished, the full Gram matrix $A^T A$ is computed in a single worker by summing up all the smaller and local $A_p$: $A^T A = \sum_p A_P^T A_p$
3) The matrix $A^T A$ is small, $n\times n$. A serial Cholesky decomposition is performed and will output the final $R$ matrix (up to a custom transposition)
4) To get $Q$, we will use the defining equation $A = QR \Rightarrow Q = A R^{-1}$. Computing the inverse of $R$ is straightforward and can be done by a single worker (a small matrix), whereas the MatMul between $A$ and the inverse of $R$ can be parallelized

In [5]:
import dask
# A FIRST NAIVE IMPLEMENTATION
def compute_choleskyQR_parallel(X_da : dask.array.Array):
    # X_da.persist(); more on this command later
    
    # A list of delayed tasks for each partition of the dataset
    # Each partition computes the local Gram matrix (as a delayed task)
    chunks_delayed = [dask.delayed(lambda x : x.T @ x)(chunk) for chunk in X_da.to_delayed().ravel()]

    # Now sum all the local Gram matrices to get the global Gram matrix
    Gram_global_delayed = dask.delayed(sum)(chunks_delayed)   ## !! This is not strictly parallel, meaning that a single worker will perform the sum instead of a tree-like operation. 

    # Compute R as the Cholesky decomposition on the global Gram matrix
    R = dask.delayed(np.linalg.cholesky)(Gram_global_delayed)
    #R.visualize("fig/CholeskyR.png")
    R = R.compute() # Compute R. This will put a stop at the parallel operation and will send R to the client (not a problem, R is small)
    R_inv = np.linalg.inv(R) # It's a small matrix, so this operation is fast even if serial

    Q = X_da.map_blocks(lambda block: block @ R_inv, dtype=X_da.dtype)
    #Q.visualize("fig/CholeskyQ.png")
    Q = Q.compute() # Compute Q and send it to the client. As of now, this is not a big problem (Q is relatively small). We will adapt this command later on
    return Q, R

def compute_choleskyQR_serial(X):
    # Global gram matrix
    G = X.T @ X
    R = np.linalg.cholesky(G)
    R_inv = np.linalg.inv(R)
    Q = X @ R_inv
    
    return Q, R

The DAG should look like (for the computation of R)

![](fig/CholeskyR.png)



As expected, the three workers act in parallel on the three partitions computing the local Gram matrix (i.e. $A_p^T A_p$). Then, a single worker collect all the Gram matrices and perform the addition and the Cholesky decomposition on the small $n \times n$ matrix.

Let's measure the time it takes to perform the parallel Cholesky QR decomposition:

In [10]:
%%time
# parallel
Q_p, R_p = compute_choleskyQR_parallel(X_da)

CPU times: user 8.05 ms, sys: 0 ns, total: 8.05 ms
Wall time: 66.7 ms


As of now, we have 3 VMs, and we explicitly instructed Dask to create only one worker per node, resulting in 3 workers in total. By inspecting the dashboard, we can see what happens under the hood:

![](fig/CloudVeneto_Cal_3workers.png)

Since we have three workers, we observe three horizontal lanes, each corresponding to a worker. The first three thin greenish bands are labeled _array_ by Dask and represent array access and data loading. This occurs because the dataset is stored on the scheduler VM. When workers need to access the data, the scheduler transfers it over the network. 
If we had executed _X_da.persist()_ before the function call, the data would have already been distributed across the workers’ memory, eliminating these transfers (and the initial greenish blocks would not appear).

The next set of parallel blocks (three, as expected) correspond to the lambda function, i.e., the local matrix multiplications.

The following red block (and the subsequent yellow one) is executed on a single worker, as required by the algorithm. These blocks represent the serial reduction step: one worker gathers all the temporary Gram matrices (red block) and performs the summation (yellow block). Then, the same worker computes the Cholesky decomposition of the resulting matrix.

The rightmost section of the panel corresponds to the computation of $Q$. Due to the algorithm’s dependencies, $Q$ cannot be computed until $R$ is available, meaning the two steps cannot be parallelized. Consequently, $Q$ must wait for $R$ to finish. Again, the red block (and the subsequent yellow block labeled _transfer-finalize_) are executed on a single worker. These steps involve collecting all the matrix blocks, stacking them to form the final $Q$, and sending it back to the client.

All the gaps between the colored segments represent Dask overhead (orchestration, scheduling, client communication, etc.), which in this case appears to be significant. This essentially indicates that the workers were idle for much of the time, while long and costly data transfers (red blocks) dominate the execution.


Indeed, running the same algorithm serially:

In [14]:
%%time
# serial
Q_s, R_s = compute_choleskyQR_serial(data.data.values)

CPU times: user 0 ns, sys: 1.52 ms, total: 1.52 ms
Wall time: 1.09 ms


The serial implementation is much faster than the parallel one. This was, unfortunately, expected for several reasons:

1. **Algorithmic limitations**: the main bottleneck comes from the serial part of the computation: only one worker is responsible for summing all the local matrices. This means that those matrices need to be transfered between nodes. In addition, we did not call _.persist()_ on the initial dataset, and we explicitly requested to _compute_ $Q, R$ and return them directly to the client at the end of the function call. This can be problematic, especially if $Q$ is large.  

2. **Dataset size**: the dataset is relatively small (only $20k$ rows, $1.28$ MB). It easily fits in the master’s RAM (even in the L3 cache!), so there is little benefit in creating partitions and transferring them across the network. In this case, serial NumPy (which also exploits multithreading internally) is naturally faster.  

3. **Partitioning strategy**: we used $n\_\text{partitions} = n\_\text{workers}$, which may not be optimal. Increasing the number of partitions can make individual parallel operations faster (since each block is smaller), but it may also require a single worker to process multiple partitions, introducing additional overhead. The impact of partition count on performance is further explored in the Benchmark notebook.  

Finally, it is worth noting that Cholesky QR is known to be numerically unstable. In fact:


In [15]:
# Let's see whether the results are compatible
diffR = np.linalg.norm(R_p - R_s, 2)
diffQ = np.linalg.norm(Q_p - Q_s, 2)
print(f"||R_parallel - R_serial||_2 = {diffR}")
print(f"||Q_parallel - Q_serial||_2 = {diffQ}")

# Check orthogonality of Q
orthogonality_metric = np.linalg.norm(Q_s.T @ Q_s - np.eye(Q_s.shape[1]), 2)
print(f"||Q^T @ Q- I||_2 = {orthogonality_metric}")
# Check decomposition
decomp_metric = np.linalg.norm(data.data.values - Q_s @ R_s, 2)
print(f"||X - Q @ R||_2 = {decomp_metric}")

||R_parallel - R_serial||_2 = 2.94455696482918e-09
||Q_parallel - Q_serial||_2 = 1.8922761232826944e-10
||Q^T @ Q- I||_2 = 7971678.680289975
||X - Q @ R||_2 = 8.723161201902093e-10


As expected, the decomposition yielded a non reasonnable result (Q is not orthogonal, the algorithm is highly unstable)

## A larger dataset

We want to improve our naive algorithm. But first, let's try with a different and larger dataset (HIGGS dataset, already downloaded on all VMs)

In [None]:
# create again a cluster
client.close()   # close the previous one
cluster.close()  # close the previous one

cluster = SSHCluster(
    ["10.67.22.154", "10.67.22.216", "10.67.22.116", "10.67.22.113"],
    connect_options={"known_hosts": None},
    remote_python="/home/ubuntu/miniconda3/bin/python",
    scheduler_options={"port": 8786, "dashboard_address": ":8797"},
    worker_options={
        "nprocs": 4,        # Now we use all 4 cores -> 12 workers
        "nthreads": 1       # We use 1 threads. Following Dask documentation, however, Numpy should release well the GIL lock thus we could use more than 1 thread. 
    }
)

client = Client(cluster)

In [17]:
print(client)
print(cluster)

<Client: 'tcp://10.67.22.154:8786' processes=12 threads=12, memory=23.25 GiB>
SSHCluster(SSHCluster, 'tcp://10.67.22.154:8786', workers=12, threads=12, memory=23.25 GiB)


In [18]:
cluster

0,1
Dashboard: http://10.67.22.154:8797/status,Workers: 12
Total threads: 12,Total memory: 23.25 GiB

0,1
Comm: tcp://10.67.22.154:8786,Workers: 0
Dashboard: http://10.67.22.154:8797/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://10.67.22.113:39571,Total threads: 1
Dashboard: http://10.67.22.113:44205/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.113:46695,
Local directory: /tmp/dask-scratch-space/worker-yqcxzeqf,Local directory: /tmp/dask-scratch-space/worker-yqcxzeqf

0,1
Comm: tcp://10.67.22.113:39941,Total threads: 1
Dashboard: http://10.67.22.113:41787/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.113:40997,
Local directory: /tmp/dask-scratch-space/worker-sne01k16,Local directory: /tmp/dask-scratch-space/worker-sne01k16

0,1
Comm: tcp://10.67.22.113:40223,Total threads: 1
Dashboard: http://10.67.22.113:41423/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.113:41111,
Local directory: /tmp/dask-scratch-space/worker-awuyhex4,Local directory: /tmp/dask-scratch-space/worker-awuyhex4

0,1
Comm: tcp://10.67.22.113:42995,Total threads: 1
Dashboard: http://10.67.22.113:41313/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.113:44613,
Local directory: /tmp/dask-scratch-space/worker-59nhspcs,Local directory: /tmp/dask-scratch-space/worker-59nhspcs

0,1
Comm: tcp://10.67.22.116:34627,Total threads: 1
Dashboard: http://10.67.22.116:37129/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.116:45239,
Local directory: /tmp/dask-scratch-space/worker-xpeq8mq5,Local directory: /tmp/dask-scratch-space/worker-xpeq8mq5

0,1
Comm: tcp://10.67.22.116:39653,Total threads: 1
Dashboard: http://10.67.22.116:43081/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.116:39005,
Local directory: /tmp/dask-scratch-space/worker-9h2vpzn1,Local directory: /tmp/dask-scratch-space/worker-9h2vpzn1

0,1
Comm: tcp://10.67.22.116:40043,Total threads: 1
Dashboard: http://10.67.22.116:41687/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.116:46171,
Local directory: /tmp/dask-scratch-space/worker-fk0pcay7,Local directory: /tmp/dask-scratch-space/worker-fk0pcay7

0,1
Comm: tcp://10.67.22.116:40983,Total threads: 1
Dashboard: http://10.67.22.116:37463/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.116:35817,
Local directory: /tmp/dask-scratch-space/worker-7moke6l7,Local directory: /tmp/dask-scratch-space/worker-7moke6l7

0,1
Comm: tcp://10.67.22.216:35753,Total threads: 1
Dashboard: http://10.67.22.216:42839/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.216:43957,
Local directory: /tmp/dask-scratch-space/worker-z5gcodhq,Local directory: /tmp/dask-scratch-space/worker-z5gcodhq

0,1
Comm: tcp://10.67.22.216:39241,Total threads: 1
Dashboard: http://10.67.22.216:35645/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.216:43877,
Local directory: /tmp/dask-scratch-space/worker-2rtttkp_,Local directory: /tmp/dask-scratch-space/worker-2rtttkp_

0,1
Comm: tcp://10.67.22.216:41137,Total threads: 1
Dashboard: http://10.67.22.216:45029/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.216:43251,
Local directory: /tmp/dask-scratch-space/worker-88ym1stn,Local directory: /tmp/dask-scratch-space/worker-88ym1stn

0,1
Comm: tcp://10.67.22.216:42517,Total threads: 1
Dashboard: http://10.67.22.216:38397/status,Memory: 1.94 GiB
Nanny: tcp://10.67.22.216:38147,
Local directory: /tmp/dask-scratch-space/worker-7hkkaej0,Local directory: /tmp/dask-scratch-space/worker-7hkkaej0


Now we have 12 workers (4 worker on each VM, excluding the scheduler/master). Let's load the HIGGS dataset through the dask.dataframe API. This will not immediately read the dataset (it won't fit in a single worker's RAM)

In [19]:
import dask.dataframe as dd
import os

os.chdir("/home/ubuntu") 
path_HIGGS = os.getcwd() + "/datasets/HIGGS.csv"
# A huge dataset
df = dd.read_csv(path_HIGGS, header=None, blocksize="200MB")    # The block size can be customized, let's start with 200 MB
X_df = df.iloc[:, 1:] 
X_da = X_df.to_dask_array(lengths=True)

In [20]:
#Let's print it
X_da

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.29 GiB 58.75 MiB Shape (11000000, 28) (275002, 28) Dask graph 40 chunks in 1 graph layer Data type float64 numpy.ndarray",28  11000000,

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [21]:
# Print the number of partition
X_da.npartitions

40

As of now, nothing has yet happened. Let's load in the worker's memory the partitions:

In [22]:
X_da.persist()

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.29 GiB 58.75 MiB Shape (11000000, 28) (275002, 28) Dask graph 40 chunks in 1 graph layer Data type float64 numpy.ndarray",28  11000000,

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


The dataset now resides in the worker's memory. Having a look at the dashboard:


![](fig/read.png)


This means that the dataset was effectively splitted (according to the scheduler's directives) and each chunk sent to the worker. This will allows us to benefit from data locality when performing operation on the dataset

Let's now run our algorithm. However, we can't directly obtain both $Q$ and $R$ on the client machine (_.compute()_). This is because $Q$ is a $m \times n$ matrix, where $m$ here is approximately $11$ billion. If each element is a double ($8 \> B$), then $Q$ is about $88 \> GB$, way to much to be collected on the client RAM. Hence, we modify our function so that both $R$ and $Q$ are persisted but not directly sent to the client

In [23]:
def compute_choleskyQR_parallel(X_da : dask.array.Array):
    
    def gramMatMul(x): #Declaring it this way will make the name appear in the Dask dashboard
        return x.T @ x
    def MatMul(x, R_inv): # Again, as above
        return x @ R_inv
    def Inverse(x):
        return np.linalg.inv(x)
        
    # A list of delayed tasks for each partition of the dataset. Each partition computes the local Gram matrix (as a delayed task)
    chunks_delayed = [dask.delayed(gramMatMul)(chunk) for chunk in X_da.to_delayed().ravel()]
    # Now sum all the local Gram matrices to get the global Gram matrix
    Gram_global_delayed = dask.delayed(sum)(chunks_delayed) 
    # Compute R as the Cholesky decomposition on the global Gram matrix 
    R = dask.delayed(np.linalg.cholesky)(Gram_global_delayed)
    
    R = R.persist() # This time, persist R (compute it but don't send it to the client
    R_inv = dask.delayed(Inverse)(R)

    X_da = X_da.persist()    # Persist again X_da, since X_da.to_delayed seems to cause troubles  (and if X_da is already persisted, this command has no effect)
    Q = X_da.map_blocks(MatMul, R_inv, dtype=X_da.dtype)
    Q = Q.persist() # Persist Q, so that it won't be sent directly to the client
    return Q, R

In [26]:
%%time
from dask.distributed import wait

client.cancel(Q)    # Remove Q and R from the worker's memory if already present, just to have reproducible results
client.cancel(R)

Q, R = compute_choleskyQR_parallel(X_da)
res = wait([Q,R])   # Q and R are not sent to the client, if we don't wait for Q and R to be available in the worker's memory we won't obtain the true timing

CPU times: user 16.6 ms, sys: 12.2 ms, total: 28.8 ms
Wall time: 372 ms


![](fig/HIGGS_cholesky_QR12.png)

The green blocks on the left represent the Gram matrix multiplication on all chunks. The following red is again the transfer and serial sum on all the Gram matrices. Once $R$ is computed, the computation of $Q$ starts by computing the sub-matrices (yellow blocks) that, when stacked up, will yield the full matrix multiplication. No other block is present after those, since $Q$ remains on the worker's memory.

Now that the dataset is larger, the cost of the serial sum-transfer become progressively smaller, but is still a burden to the algorithm's performances

## Tree reduction of addition

Instead of assigning the sum of all $40$ sub-matrices to a single worker, let's build a parallel alternative:

In [27]:
def compute_choleskyQR_parallel_tree(X_da : dask.array.Array):
    
    def gramMatMul(x): #Declaring it this way will make the name appear in the Dask dashboard
        return x.T @ x
    def MatMul(x, R_inv): 
        return x @ R_inv
    def PartialSum(a,b):
        return a+b
    def Inverse(R):
        return np.linalg.inv(R)
        
    # A list of delayed tasks for each partition of the dataset. Each partition computes the local Gram matrix (as a delayed task)
    chunks_delayed = [dask.delayed(gramMatMul)(chunk) for chunk in X_da.to_delayed().ravel()]
    # Sum the chunks pairwise
    while len(chunks_delayed) > 1:
        new_level = []
        for i in range(0, len(chunks_delayed), 2):
            if i + 1 < len(chunks_delayed):
                new_level.append(dask.delayed(PartialSum)(chunks_delayed[i], chunks_delayed[i+1]))
            else:
                new_level.append(chunks_delayed[i])
        chunks_delayed = new_level
    # The final result is in the first position
    Gram_global_delayed = chunks_delayed[0]
    R = dask.delayed(np.linalg.cholesky)(Gram_global_delayed)
    R = R.persist()  # Persist R (compute it, but don't send it to the client though) 
    
    R_inv = dask.delayed(Inverse)(R) # It's a small matrix, so this operation is fast even if serial, no need to parallelize it
    X_da = X_da.persist()    # Persist again X_da, since X_da.to_delayed seems to cause troubles (if already persisted in memory, this does nothing)
    Q = X_da.map_blocks(MatMul, R_inv, dtype = X_da.dtype)
    Q = Q.persist() # Persist Q
    return Q, R     # Now both Q and R are dask.array, distributed across al nodes

Applying this new function

In [30]:
%%time
client.cancel(Q)
client.cancel(R)
Q, R = compute_choleskyQR_parallel_tree(X_da)
res = wait([Q,R])

CPU times: user 24.3 ms, sys: 0 ns, total: 24.3 ms
Wall time: 360 ms


Below a snapshot of the Dask Dashboard with the new version of the algo (the DAG is shown in [link](fig/DAG_Tree.png)). The small red blocks represent data transfers between nodes for their local Gram matrices and the subsequent summation. Since the summation is now performed in a pairwise manner, Dask can orchestrate it dynamically to improve performance and and interleave it with the matrix multiplication computations to improve overall performance. Furthermore, since we are no longer requesting the computation of $R$, the computation of $Q$ can start right after $R$ is found (and no transfer to the client is needed, even if $R$ is relatively small). Put in other words, the blank space between the two section of the dashboard is smaller

The small red blocks appearing before the yellow ones are labeled _transfer-MatMul_. They correspond to the distribution of R_inv to all worker nodes (in fact the node which originally computed the inverse does not display these blocks, since it already has the data locally).

![](fig/HIGGS_cholesky_parallelsum.png)


Let's briefly test the two versions:

In [31]:
import time

no_tree, tree = list(), list()
for _ in range(50):
    # start anew, don't cache previous results
    client.cancel(Q)
    client.cancel(R)
    #start a timer
    start = time.time()
    # launch the function
    Q, R = compute_choleskyQR_parallel(X_da)
    wait([Q, R])   # wait until both Q and R are available in the worker's memory. 
    end = time.time()
    no_tree.append(end-start)
    
for _ in range(50):
    # start anew, don't cache previous results
    client.cancel(Q)
    client.cancel(R)
    #start a timer
    start = time.time()
    # launch the function
    Q, R = compute_choleskyQR_parallel_tree(X_da)
    wait([Q, R])   # wait until both Q and R are available in the worker's memory. 
    end = time.time()
    tree.append(end-start) 

print(f"Average time elapsed when running Cholesky QR (no tree reduction): {np.mean(no_tree):.3f} +/- {np.std(no_tree):.3f}")
print(f"Average time elapsed when running Cholesky QR (tree reduction): {np.mean(tree):.3f} +/- {np.std(tree):.3f}")

Average time elapsed when running Cholesky QR (no tree reduction): 0.349 +/- 0.025
Average time elapsed when running Cholesky QR (tree reduction): 0.398 +/- 0.029


It seems that introducing the tree reduction slightly worsened performance, a trend confirmed across different numbers of partitions. This is likely because adding multiple pairwise summations increases orchestration overhead. In this case, it is actually more efficient to transfer the Gram matrices and aggregate them on a single worker, since these matrices are very small (tipical in tall and skinny matrices). If the Gram matrices had been larger, however, data transfers would have become a significant bottleneck, and the tree reduction approach might have been preferable. 

---
N.B. Function's output:

In [32]:
Q   # Q is a dask.array. It is saved across nodes

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.29 GiB 58.75 MiB Shape (11000000, 28) (275002, 28) Dask graph 40 chunks in 1 graph layer Data type float64 numpy.ndarray",28  11000000,

Unnamed: 0,Array,Chunk
Bytes,2.29 GiB,58.75 MiB
Shape,"(11000000, 28)","(275002, 28)"
Dask graph,40 chunks in 1 graph layer,40 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [33]:
# If we want to access to some of its contents:
Q[1:100,].compute()

array([[-1.47612920e-03,  2.53373483e-04,  1.02861174e-04, ...,
         2.41738361e-04,  3.88321597e-04,  1.88456408e-03],
       [-1.39534303e-03,  1.78113473e-04, -4.21745580e-04, ...,
         3.07084322e-04,  2.00840189e-04,  1.84154219e-03],
       [-1.71529661e-03,  6.80937516e-06,  2.07760429e-04, ...,
         3.05650473e-04,  1.83738288e-04,  2.26122402e-03],
       ...,
       [-1.36721946e-03,  1.23822149e-04, -3.10010675e-04, ...,
         1.63916970e-04, -7.75422796e-05,  2.29137290e-03],
       [-1.53373224e-03, -1.10148748e-04,  1.03036149e-05, ...,
        -1.23919712e-04,  1.91734705e-06,  1.76111089e-03],
       [-1.68392274e-03, -5.61017193e-04, -3.75900205e-04, ...,
         6.54935770e-04,  1.98873649e-04,  2.27322453e-03]],
      shape=(99, 28))

In [34]:
R  # R is a Delayed object, but its elements are actually already computed and reside on a single worker (it's a small matrix, no need to partition it)

Delayed('cholesky-05b92d6e-9146-4f38-8dea-d01006c2be16')

I decided to leave $R$ as a Delayed object (referencing a Numpy array) and not turn it into a Dask.array because it is a small matrix that does not benefit from distribution across nodes. Converting it into a Dask.array would introduce unnecessary overhead, including extra scheduling and memory management, without improving performance.

In [35]:
R.compute()   # to access the reference R points to
client.who_has(R)   # This shows which worker has R in its memory

Key,Copies,Workers
cholesky-05b92d6e-9146-4f38-8dea-d01006c2be16,1,tcp://10.67.22.216:42517


## Definitive function

In [36]:
def compute_choleskyQR_parallel_optimal(X_da : dask.array.Array):
    
    def gramMatMul(x): #Declaring it this way will make the name appear in the Dask dashboard
        return x.T @ x
    def MatMul(x, R_inv): # Again, as above
        return x @ R_inv
    def Inverse(x):
        return np.linalg.inv(x)
        
    # A list of delayed tasks for each partition of the dataset. Each partition computes the local Gram matrix (as a delayed task)
    chunks_delayed = [dask.delayed(gramMatMul)(chunk) for chunk in X_da.to_delayed().ravel()]
    # Now sum all the local Gram matrices to get the global Gram matrix
    Gram_global_delayed = dask.delayed(sum)(chunks_delayed)   ## !! This is not parallel, but seems to be the best choice
    # Compute R as the Cholesky decomposition on the global Gram matrix (as a delayed even if a serial operation just call .compute at the end)
    R = dask.delayed(np.linalg.cholesky)(Gram_global_delayed)
    
    R = R.persist() # This time, persist R (compute it but don't send it to the client
    R_inv = dask.delayed(Inverse)(R)

    X_da = X_da.persist()    # Persist again X_da, since X_da.to_delayed seems to cause troubles. If already in memory, this does nothing
    Q = X_da.map_blocks(MatMul, R_inv, dtype=X_da.dtype)
    Q = Q.persist() # Persist Q, so that it won't be sent directly to the client
    return Q, R

In [37]:
client.close()
cluster.close()