# Distributed Matrix Data Structure and Its Statistical Applications on PyTorch

** NOTE: This notebook is incomplete. At this time, it is uploaded for testing purposes, in preparation of tutorials to be presented in the programming workshop at the inaugural Lange's Symposium on Feb 21-22, 2020.**

## Synopsis

We developed a distributed matrix operation package suitable for distributed matrix-vector operations and distributed tall-and-thin (or wide-and-short) matrices.
The code runs on both multi-node machines and multi-GPU machines using PyTorch.
We apply this package for four statistical applications, namely, nonnegative matrix factorization (NMF), multidimensional scaling (MDS), positron emission tomography (PET), and $\ell_1$-regularized Cox regression.
In particular, $\ell_1$-regularized Cox regression with the UK Biobank dataset was the biggest multivariate survival analysis to our knowledge. 
In this workshop, we provide small examples that run on a single node, and demonstrate multi-GPU usage on our own machine.

## Introduction to PyTorch

What is PyTorch?

## Basic PyTorch Operations

We introduce simple operations on PyTorch. Note that Python uses 0-based, rowmajor ordering, like C and C++ (R is 1-based, column-major ordering). First we import the PyTorch
library. This is equvalent to library() in R.

In [1]:
import torch

In [2]:
torch.__version__

'1.1.0'

### Tensor Creation

One may create an uninitialized tensor. This creates a 3 × 4 tensor (matrix).

In [3]:
torch.empty(3, 4) # uninitialized tensor

tensor([[7.1466e-44, 6.1657e-44, 4.4842e-44, 7.2868e-44],
        [5.7453e-44, 4.4842e-44, 4.9045e-44, 4.4842e-44],
        [1.6395e-43, 1.5414e-43, 1.4714e-43, 1.5414e-43]])

The following is equivalent to `set.seed()` in R.

In [4]:
torch.manual_seed(100)

<torch._C.Generator at 0x7f1ccc1626d0>

This generates a tensor initialized with random values from (0, 1).

In [5]:
y = torch.rand(3, 4) # from Unif(0, 1)
y

tensor([[0.1117, 0.8158, 0.2626, 0.4839],
        [0.6765, 0.7539, 0.2627, 0.0428],
        [0.2080, 0.1180, 0.1217, 0.7356]])

We can also generate a tensor filled with zeros or ones.

In [6]:
z = torch.ones(3, 4) # torch.zeros(3, 4)
z

tensor([[1., 1., 1., 1.],
        [1., 1., 1., 1.],
        [1., 1., 1., 1.]])

A tensor can be created from standard Python data.

In [7]:
w = torch.tensor([3, 4, 5, 6])
w

tensor([3, 4, 5, 6])

A tensor can be created in certain datatype (default: float32) and on certain device (default: CPU) of choice 

In [8]:
# double precision
w = torch.tensor([3, 4, 5, 6], dtype=torch.float64)
w

tensor([3., 4., 5., 6.], dtype=torch.float64)

In [9]:
# on GPU number zero. will not run if CUDA GPU is not present.
w = torch.tensor([3, 4, 5, 6], device='cuda:0')
w

tensor([3, 4, 5, 6], device='cuda:0')

Shape of a tensor can be accessed by appending `.shape` to the tensor name.

In [10]:
z.shape

torch.Size([3, 4])

### Casting

A tensor can change datatype and location by the method `.to()`. The arguments are similar to choosing datatype and device of the new tensor.

In [11]:
w = w.to(device = "cpu", dtype=torch.int32)
w

tensor([3, 4, 5, 6], dtype=torch.int32)

### Indexing

The following are standard method of indexing tensors.

In [12]:
y[2, 3] # indexing: zero-based, returns a 0-dimensional tensor

tensor(0.7356)

The indexing always returns a (sub)tensor, even for scalars (treated as zero-dimensional tensors).
A standard Python number can be returned by using .item().

In [13]:
y[2, 3].item() # A standard Python floating-point number

0.7355988621711731

To get a column from a tensor, we use the indexing as below. The syntax is similar but slightly
different from R.

In [14]:
y[:, 3] # 3rd column. The leftmost column is 0th. cf. y[, 4] in R

tensor([0.4839, 0.0428, 0.7356])

The following is for taking a row.

In [15]:
y[2, :] # 2nd row. The top row is 0th. cf. y[3, ] in R

tensor([0.2080, 0.1180, 0.1217, 0.7356])

### Simple operations

Here we provide an example of simple operations on PyTorch. Addition using the operator ‘+’ acts
just like anyone can expect:

In [16]:
x = y + z # a simple addition.
x

tensor([[1.1117, 1.8158, 1.2626, 1.4839],
        [1.6765, 1.7539, 1.2627, 1.0428],
        [1.2080, 1.1180, 1.1217, 1.7356]])

Here is another form of addition.

In [17]:
x = torch.add(y, z) # another syntax for addition

The operators ending with an underscore (`_`) changes the value of the tensor in-place. Otherwise, the argument never changes. Unlike methods ending with `!` in Julia, this rule is strictly enforced in PyTorch. (The underscore determines usage of the keyword `const` in C++-level.)

In [18]:
y.add_(z) # in-place addition

tensor([[1.1117, 1.8158, 1.2626, 1.4839],
        [1.6765, 1.7539, 1.2627, 1.0428],
        [1.2080, 1.1180, 1.1217, 1.7356]])

### Concatenation

We can concatenate the tensors using the function `cat()`, which resembles `c()`, `cbind()`, and
`rbind()` in R. The second argument indicates the dimension that the tesors are concatenated
along: zero means by concatenation by rows, and one means by columns.

In [19]:
torch.cat((y, z), 0) # along the rows

tensor([[1.1117, 1.8158, 1.2626, 1.4839],
        [1.6765, 1.7539, 1.2627, 1.0428],
        [1.2080, 1.1180, 1.1217, 1.7356],
        [1.0000, 1.0000, 1.0000, 1.0000],
        [1.0000, 1.0000, 1.0000, 1.0000],
        [1.0000, 1.0000, 1.0000, 1.0000]])

In [20]:
torch.cat((y, z), 1) # along the columns

tensor([[1.1117, 1.8158, 1.2626, 1.4839, 1.0000, 1.0000, 1.0000, 1.0000],
        [1.6765, 1.7539, 1.2627, 1.0428, 1.0000, 1.0000, 1.0000, 1.0000],
        [1.2080, 1.1180, 1.1217, 1.7356, 1.0000, 1.0000, 1.0000, 1.0000]])

### Reshaping

One can reshape a tensor, like changing the attribute `dim` in R.

In [21]:
y.view(12) # 1-dimensional array

tensor([1.1117, 1.8158, 1.2626, 1.4839, 1.6765, 1.7539, 1.2627, 1.0428, 1.2080,
        1.1180, 1.1217, 1.7356])

Up to one of the arguments of `view()` can be −1. The size of the reshaped tensor is inferred
from the other dimensions.

In [22]:
# reshape into (6)-by-2 tensor;
# (6) is inferred from the other dimension
y.view(-1, 2)

tensor([[1.1117, 1.8158],
        [1.2626, 1.4839],
        [1.6765, 1.7539],
        [1.2627, 1.0428],
        [1.2080, 1.1180],
        [1.1217, 1.7356]])

### Basic statistics

Calling `.sum()`, `.mean()`, `.std()` methods of a tensor do the obvious. Optional argument determines the dimension of reduction.

In [23]:
y

tensor([[1.1117, 1.8158, 1.2626, 1.4839],
        [1.6765, 1.7539, 1.2627, 1.0428],
        [1.2080, 1.1180, 1.1217, 1.7356]])

In [24]:
y.sum()

tensor(16.5933)

In [25]:
y.sum(0) # reduces rows, columnwise sum

tensor([3.9962, 4.6878, 3.6469, 4.2623])

In [26]:
y.sum(1) # reduces columns, rowwise sum

tensor([5.6739, 5.7359, 5.1834])

In [27]:
y.sum((0, 1)) # reduces rows and columns -> a single number.

tensor(16.5933)

In [28]:
y.mean()

tensor(1.3828)

In [29]:
y.mean(0)

tensor([1.3321, 1.5626, 1.2156, 1.4208])

In [30]:
y.std(1)

tensor([0.3058, 0.3384, 0.2961])

### Linear Algebra

Matrix transpose is performed by appending `.t()` to a tensor. Matrix multiplication is carried out by the method `torch.mm()`.

In [31]:
torch.mm(y, z.t())

tensor([[5.6739, 5.6739, 5.6739],
        [5.7359, 5.7359, 5.7359],
        [5.1834, 5.1834, 5.1834]])

## `torch.distributed`: Distributed subpackage for PyTorch

`torch.distributed` is the subpackage for distributed operations on PyTorch. The interface is mostly inspired by the message passing interface (MPI). The available backends are:

* Gloo, a collective communication library developed by Facebook, included in PyTorch. Full support for CPU, partial collective communication only for GPU.
* MPI, a good-old communication standard. The most flexible, but PyTorch needs to be compiled from its source to use it as a backend. Full support for GPU if the MPI installation is "CUDA-aware".
* NCCL, Nvidia Collective Communications Library, collective communication only for multiple GPUs on the same machine.

For this workshop, we use Gloo for its full functionalities on CPU and runnability on Jupyter Notebook. The experiments in our paper use MPI for running multi-node setting and multi-GPU setting with basically the same code. The interface below is specific for Gloo backend. For MPI backend, please consult with a section from [distributed package tutorial](https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends) or [our code](https://github.com/kose-y/dist_stat/tree/master/examples).

In [32]:
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

def run_process(size, fn):
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, fn))
        p.start()
        processes.append(p)
        
    for p in processes:
        p.join()

### Point-to-point communication

![](https://pytorch.org/tutorials/_images/send_recv.png)
Figure courtesy of: https://pytorch.org/tutorials/_images/send_recv.png

In [33]:
"""Blocking point-to-point communication."""

def point_to_point(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    elif rank == 1:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    dist.barrier()
    print('Rank ', rank, ' has data ', tensor[0])

In [34]:
run_process(4, point_to_point)

Rank  3  has data  tensor(0.)
Rank  0  has data  tensor(1.)
Rank  2  has data  tensor(0.)
Rank  1  has data  tensor(1.)


### Collective communication

| | | 
|:---|:---|
| ![](https://pytorch.org/tutorials/_images/scatter.png) | ![](https://pytorch.org/tutorials/_images/gather.png) |
| Scatter | Gather |
| ![](https://pytorch.org/tutorials/_images/reduce.png) | ![](https://pytorch.org/tutorials/_images/all_reduce.png) |
| Reduce | All-reduce |
| ![](https://pytorch.org/tutorials/_images/broadcast.png) | ![](https://pytorch.org/tutorials/_images/all_gather.png) |
| Broadcast | All-gather |

Table courtesy of: https://pytorch.org/tutorials/intermediate/dist_tuto.html#communication-backends


In [35]:
def broadcast(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor[0] = 7
    dist.broadcast(tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

In [36]:
run_process(4, broadcast)

Rank  0  has data  tensor(7.)
Rank  2  has data  tensor(7.)
Rank  1  has data  tensor(7.)
Rank  3  has data  tensor(7.)


In [37]:
def reduce(rank, size):
    tensor = torch.ones(1)
    dist.reduce(tensor, 3)
    print('Rank ', rank, ' has data ', tensor[0])

In [38]:
run_process(4, reduce)

Rank  0  has data  tensor(4.)
Rank  2  has data  tensor(2.)
Rank  3  has data  tensor(4.)
Rank  1  has data  tensor(3.)


In [39]:
def all_reduce(rank, size):
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
    print('Rank ', rank, ' has data ', tensor[0])

In [40]:
run_process(4, all_reduce)

Rank  2  has data  tensor(4.)
Rank  0  has data  tensor(4.)
Rank  3  has data  tensor(4.)
Rank  1  has data  tensor(4.)


In [41]:
def all_gather(rank, size):
    tensors = [torch.zeros(1) for i in range(size)]
    dat = torch.zeros(1)
    dat[0] += rank
    dist.all_gather(tensors, dat)
    print('Rank ', rank, ' has data ', tensors)

In [42]:
run_process(4, all_gather)

Rank  2  has data  [tensor([0.]), tensor([1.]), tensor([2.]), tensor([3.])]
Rank  3  has data  [tensor([0.]), tensor([1.]), tensor([2.]), tensor([3.])]
Rank  1  has data  [tensor([0.]), tensor([1.]), tensor([2.]), tensor([3.])]
Rank  0  has data  [tensor([0.]), tensor([1.]), tensor([2.]), tensor([3.])]


## `distmat`: Distributed Matrices on PyTorch

Using the tensor operations and communication package, we created a data structure for a distributed matrix. In this structure, each process,
enumerated by its rank, holds a contiguous block of the full data matrix by rows or columns.
The data may be a sparse matrix. If GPUs are involved, each process controls a GPU whose
index matches the process rank. For notational simplicity, we denote the dimension to split
in square brackets. If a [100] × 100 matrix is split over four processes, the process with rank
0 keeps the first 25 rows of the matrix, and the rank 3 process takes the last 25 rows. For
the sake of simplicity, we always assume that the size along the split dimension is divided
by the number of processes.

### Creation

### Elementwise operations

Some of the basic functions work naturally. 

In [None]:
C = A + B

In [None]:
A.log()

For general functions, we have `.apply()`, `.apply_binary()`, `.apply_inplace()`, and `.apply_inplace_binary()`.

### Reductions (sum, product, max, min)

### Matrix multiplications

Six different scenarios of matrix-matrix multiplications, each representing a different configuration of the split dimension of two input
matrices and the output matrix, were considered and implemented. 

< Content of Table 1 here >



The implementation of each case is carried
out using the collective communication directives. Matrix multiplication scenarios are automatically selected based on the shapes of the input matrices A and
B, except for the Scenarios 1 and 3 sharing the same input structure. Those two are further
distinguished by the shape of output, AB. The nonnegative matrix factorization involves Scenarios 1 to 5.
Scenario 6 is for matrix-vector multiplications, where broadcasting small vectors is almost
always efficient.

## Nonnegative Matrix Factorization (NMF)

The following code is a simplified version. The full object-oriented version is available at the [GitHub repo](https://github.com/kose-y/dist_stat).

In [None]:
def nmf(rank, size):
    # TODO: initialization here
    for i in range(maxiter):
        XWt =  distmat.mm(data, W.t())
        WWt =  distmat.mm(W, W.t())
        VWWt = distmat.mm(V, WWt)
        V.mul_(XWt).div_(VWWt)

        VtX  = distmat.mm(V.t(), data, out_sizes=W.sizes)
        VtV  = distmat.mm(V.t(), V)
        VtVW = distmat.mm(VtV, W)
        W = W.mul_(VtX).div_(VtVW)
        if i % 10 == 0:
            # print obj
            pass

## $\ell_1$-regularized Cox Regression

In [None]:
def cox_l1(rank, size):
    # TODO: initialization here
    lambd = 0.01
    soft_threshold = torch.nn.Softshrink(lambd)
    for i in range(maxiter):
        Xbeta = distmat.mm(data, beta)
        w = Xbeta.exp()
        W = w.cumsum(0)
        w_dist = distmat.dist_data(w, TType=w.TType)
        pi = (w_dist/W.t()) * pi_ind
        pd  = distmat.mm(pi, delta)
        dmpd = delta_dist - pd
        grad = distmat.mm(datat, dmpd)
        beta = (beta + grad * sigma).apply(soft_threshold)
        if i % 10 == 0:
            # print obj
            pass

## Multi-GPU Demonstration

We demonstrate 10,000 x 10,000 examples on 2-8 GPUs on our server.

## Multi-node

The data structure can also be utilized on multi-node clusters. The structure was used for the analysis of 200,000 x 500,000 UK Biobank data.

## Future Direction

MPI-only, lightweight, more flexible version in Julia is in preparation. CUDA-aware MPI support for the central MPI interface [MPI.jl](https://github.com/JuliaParallel/MPI.jl) was added in the process.