## Sgkit GWAS Workflow Example

This simulated workflow generates data to emulate large-scale GWAS regressions.

See https://github.com/pystatgen/sgkit/issues/438 for more details.

In [1]:
%run setup.ipynb

In [2]:
%run matmul.ipynb

In [3]:
# Choose dataset size
# Debug settings
n = 1000 # Number of variants (i.e. genomic locations)
m = 1000 # Number of individuals (i.e. people)
c = 3    # Number of covariates (i.e. confounders)

# Representative settings for single (small) UK Biobank chromosome:
# n, m, c = 141910, 365941, 25

# XY chromosome
n, m, c = 8444, 365941, 25
n = n * 8

In [15]:
# Whether to persist the input data in cluster memory
persist = True

In [5]:
# Choose type of Dask cluster to run on
#cluster_type = "threads"
#cluster_type = "local"
cluster_type = "dist"
n_workers = 1
client = get_dask_cluster(cluster_type, n_workers=n_workers)
client

0,1
Client  Scheduler: tcp://10.142.0.4:8786  Dashboard: http://10.142.0.4:8787/status,Cluster  Workers: 16  Cores: 128  Memory: 505.00 GB


In [6]:
# Choose type of storage to use
#storage = "file"
storage = "gs"
fs = fsspec.filesystem(storage)
fs

<gcsfs.core.GCSFileSystem at 0x7f10403154f0>

In [7]:
if storage == "gs":
    path = f"gs://{gcs_bucket}/sim_ds_{n}_{m}_{c}.zarr"
else:
    path = f"sim_ds_{n}_{m}_{c}.zarr"
path

'gs://rs-gwas-benchmark/sim_ds_67552_365941_25.zarr'

In [8]:
if storage == "gs":
    output_path = f"gs://{gcs_bucket}/sim_res_{n}_{m}_{c}.zarr"
else:
    output_path = f"sim_res_{n}_{m}_{c}.zarr"
output_path

'gs://rs-gwas-benchmark/sim_res_67552_365941_25.zarr'

In [9]:
# Create the dataset on cloud storage if not already present
if not fs.exists(path):
    with ProgressBar():
        rs = da.random.RandomState(0)
        XL, BL = rs.randint(0, 128, size=(n, m), chunks=(5216, 5792)), da.array([1] + [0] * (m - 1))
        XC, BC = rs.normal(size=(m, c)), rs.normal(size=(c,))
        Y = (XL * BL).sum(axis=0) + XC @ BC + rs.normal(scale=.001, size=m)
        ds = xr.Dataset(dict(
            # This is a proxy for discretized allele dosages (between 0 and 2)
            XL=(('variants', 'samples'), (2 * XL / 127).astype('f2')),
            # This value represents covariates for samples, e.g. age, sex, ancestry, etc.
            XC=(('samples', 'covariates'), XC.astype('f4')),
            # This is the outcome on which all variant data will be regressed separately
            Y=(('samples', 'outcomes'), Y[:, np.newaxis].astype('f4')),
        ))
        print(f'Saving simulated data to {path}')
        ds.to_zarr(fsspec.get_mapper(path), mode='w', consolidated=True)

In [10]:
ds = xr.open_zarr(fsspec.get_mapper(path), consolidated=True)
ds

Unnamed: 0,Array,Chunk
Bytes,36.59 MB,36.59 MB
Shape,"(365941, 25)","(365941, 25)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 36.59 MB 36.59 MB Shape (365941, 25) (365941, 25) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",25  365941,

Unnamed: 0,Array,Chunk
Bytes,36.59 MB,36.59 MB
Shape,"(365941, 25)","(365941, 25)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,49.44 GB,60.42 MB
Shape,"(67552, 365941)","(5216, 5792)"
Count,833 Tasks,832 Chunks
Type,float16,numpy.ndarray
"Array Chunk Bytes 49.44 GB 60.42 MB Shape (67552, 365941) (5216, 5792) Count 833 Tasks 832 Chunks Type float16 numpy.ndarray",365941  67552,

Unnamed: 0,Array,Chunk
Bytes,49.44 GB,60.42 MB
Shape,"(67552, 365941)","(5216, 5792)"
Count,833 Tasks,832 Chunks
Type,float16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.46 MB,23.17 kB
Shape,"(365941, 1)","(5792, 1)"
Count,65 Tasks,64 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.46 MB 23.17 kB Shape (365941, 1) (5792, 1) Count 65 Tasks 64 Chunks Type float32 numpy.ndarray",1  365941,

Unnamed: 0,Array,Chunk
Bytes,1.46 MB,23.17 kB
Shape,"(365941, 1)","(5792, 1)"
Count,65 Tasks,64 Chunks
Type,float32,numpy.ndarray


In [11]:
def gwas(XL, XC, Y):
    # Add intercept
    XC = da.concatenate([da.ones((XC.shape[0], 1), dtype=XC.dtype), XC], axis=1)
    
    # Rechunk along short axes
    #XC = XC.rechunk((None, -1))
    XC = XC.rechunk((5792, -1)) # rechunk first dim to 5792 to match XL
    Y = Y.rechunk((None, -1))
    dof = Y.shape[0] - XC.shape[1] - 1
    
    # Apply orthogonal projection to eliminate core covariates
    #XLP = XL - XC @ da.linalg.lstsq(XC, XL)[0]
    #YP = Y - XC @ da.linalg.lstsq(XC, Y)[0]
    XLP = XL - matmul(XC, da.linalg.lstsq(XC, XL)[0])
    YP = Y - matmul(XC, da.linalg.lstsq(XC, Y)[0])

    # Estimate coefficients for each loop covariate
    XLPS = (XLP ** 2).sum(axis=0, keepdims=True).T
    #B = (XLP.T @ YP) / XLPS
    B = matmul(XLP.T, YP) / XLPS

    # Compute residuals for each loop covariate and outcome separately
    YR = YP[:, np.newaxis, :] - XLP[..., np.newaxis] * B[np.newaxis, ...]
    RSS = (YR ** 2).sum(axis=0)
    
    # Get t-statistics for coefficient estimates and match to p-values
    T = B / np.sqrt(RSS / dof / XLPS)
    P = da.map_blocks(
        lambda t: 2 * stats.distributions.t.sf(np.abs(t), dof), T, dtype="float64"
    )
    return xr.Dataset(dict(
        beta=(('variants','outcomes'), B), 
        pval=(('variants','outcomes'), P)
    ))

In [16]:
# Define the GWAS regressions
# Note: This (the largest) array needs to be rechunked due to scalability 
    # issues with da.matmul, specifically https://github.com/dask/dask/pull/6924.
    # See here for more details:
    # https://github.com/pystatgen/sgkit/issues/390#issuecomment-730660134
XL = ds.XL.data.rechunk((652, 5792)).T.astype('f4')
if persist:
    XL = XL.persist()
    wait(XL)
dsr = gwas(
    XL, 
    ds.XC.data, 
    ds.Y.data
)
dsr

  intermediate = blockwise(
  out = blockwise(


Unnamed: 0,Array,Chunk
Bytes,270.21 kB,2.61 kB
Shape,"(67552, 1)","(652, 1)"
Count,101040 Tasks,104 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 270.21 kB 2.61 kB Shape (67552, 1) (652, 1) Count 101040 Tasks 104 Chunks Type float32 numpy.ndarray",1  67552,

Unnamed: 0,Array,Chunk
Bytes,270.21 kB,2.61 kB
Shape,"(67552, 1)","(652, 1)"
Count,101040 Tasks,104 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,540.42 kB,5.22 kB
Shape,"(67552, 1)","(652, 1)"
Count,137192 Tasks,104 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 540.42 kB 5.22 kB Shape (67552, 1) (652, 1) Count 137192 Tasks 104 Chunks Type float64 numpy.ndarray",1  67552,

Unnamed: 0,Array,Chunk
Bytes,540.42 kB,5.22 kB
Shape,"(67552, 1)","(652, 1)"
Count,137192 Tasks,104 Chunks
Type,float64,numpy.ndarray


In [19]:
if fs.exists(output_path):
    print(f"Deleting existing output {output_path}")
    fs.rm(output_path, recursive=True)

Deleting existing output gs://rs-gwas-benchmark/sim_res_67552_365941_25.zarr


In [20]:
%%time
if client is None:
    cm = ProgressBar()
else:
    cm = performance_report(f"reports/pr_{n}_{m}_{c}_{cluster_type}_{n_workers}_{storage}_tmp.html")
with cm:
    # Compute and save betas/p-values
    dsr.to_zarr(fsspec.get_mapper(output_path), mode='w', consolidated=True)
    print(f'Results saved to {output_path}')

Results saved to gs://rs-gwas-benchmark/sim_res_67552_365941_25.zarr
CPU times: user 16.6 s, sys: 320 ms, total: 16.9 s
Wall time: 3min 42s
