# SSSP Example

In [None]:
import dask
import numpy as np
import grblas as gb
import dask_grblas as dgb
from grblas import op

In [None]:
from dask.distributed import Client
client = Client()
client

In [None]:
# Create random data
N = 1000
num_chunks = 4
r = np.random.rand(N, N) < 0.001
r = r | r.T  # symmetric
r = r & ~np.diag(np.ones(N, dtype=bool))  # no self edges

In [None]:
# Option 1: create distributed Matrix from local data
def to_matrix(chunk):
    rows, cols = np.nonzero(chunk)
    values = np.random.rand(rows.size)
    return dgb.Matrix.from_values(
        rows,
        cols,
        values,
        nrows=chunk.shape[0],
        ncols=chunk.shape[1]
    )
chunks = np.array_split(r, num_chunks, axis=0)
delayed_chunks = [to_matrix(chunk) for chunk in chunks]
A = dgb.row_stack(delayed_chunks)
sources = dgb.Vector.from_values(
    np.random.randint(N),
    0,
    size=N,
    dtype=A.dtype
)

In [None]:
# Option 2: create distributed Matrix from distributed (delayed) data
chunks = np.array_split(r, num_chunks, axis=0)
ncols = chunks[0].shape[1]
row_lengths = np.array([chunk.shape[0] for chunk in chunks])
row_offsets = np.roll(row_lengths.cumsum(), 1)
row_offsets[0] = 0

chunked_rows = []
chunked_cols = []
chunked_vals = []
for chunk, row_offset in zip(chunks, row_offsets):
    rows, cols = np.nonzero(chunk)
    chunked_rows.append(rows + row_offset)
    chunked_cols.append(cols)
    chunked_vals.append(np.random.rand(rows.size))

delayed_rows = [dask.delayed(rows) for rows in chunked_rows]
delayed_cols = [dask.delayed(cols) for cols in chunked_cols]
delayed_vals = [dask.delayed(cols) for cols in chunked_vals]

@dask.delayed
def to_matrix(rows, cols, vals, nrows, ncols):
    # Can also use e.g. gb.Matrix.ss.import_csr
    return gb.Matrix.from_values(rows, cols, vals, nrows=nrows, ncols=ncols)

delayed_matrices = [
    to_matrix(
        delayed_rows[i] - row_offsets[i],
        delayed_cols[i],
        delayed_vals[i],
        row_lengths[i],
        ncols
    ) for i in range(num_chunks)
]

delayed_chunks = [
    dgb.Matrix.from_delayed(
        delayed_matrices[i],
        gb.dtypes.FP64,
        row_lengths[i],
        ncols,
    )
    for i in range(num_chunks)
]

A = dgb.row_stack(delayed_chunks)
sources = dgb.Vector.from_values(
    np.random.randint(N),
    0,
    size=N,
    dtype=A.dtype
)

In [None]:
# Calculate expected with grblas
B = A.compute()
v = sources.dup().compute()
v_dup = gb.Vector.new(v.dtype, size=N)
i = 0
while True:
    i += 1
    v_dup << v
    v(op.min) << B.mxv(v, op.min_plus)
    if v.isequal(v_dup):
        break
expected = v
i

In [None]:
# Calculate with dask-grblas
i = 0
v = sources.dup()
while True:
    i += 1
    v_dup = v.dup()
    v(op.min) << A.mxv(v, op.min_plus)
    # persist so we don't recompute every iteration
    v._delayed = v._delayed.persist()  # scheduler='synchronous')
    if v.isequal(v_dup):
        break
i

In [None]:
assert expected.isequal(v.compute())

In [None]:
expected