In [1]:
from sgkit.distance.api import pairwise_distance_blocks, pairwise_distance_blockwise
import dask.array as da

from dask.distributed import Client, performance_report, get_task_stream
from dask.diagnostics import ProgressBar, ResourceProfiler
from bokeh.io import output_notebook

import numpy as np
import coiled
import fsspec, zarr

In [2]:
coiled.__version__

'0.0.35'

In [3]:
import dask
dask.__version__

'2.30.0'

In [4]:
!pwd

/Users/aktech/dev/sgkit


In [5]:
# env_name = "sgkit-blockwise-new"
env_name = "pairwise-comparison-clean"

In [6]:
%%time
coiled.create_software_environment(
    name=env_name,
    conda="environment.yml",
)

Updating software environment...
Solving conda environment...
Conda environment solved!
Building Docker image
(this takes a few minutes)
STEP 1: FROM coiled/default:sha-9aa53a2
STEP 2: COPY environment.yml environment.yml
--> fc9246144e4
STEP 3: RUN conda env update -n coiled -f environment.yml     && rm environment.yml     && conda clean --all -y     && echo "conda activate coiled" >> ~/.bashrc
Collecting package metadata (repodata.json): ...working... done
Solving environment: ...working... done

Downloading and Extracting Packages
chardet-3.0.4        | 170 KB    | ########## | 100% 
libsodium-1.0.18     | 366 KB    | ########## | 100% 
typing_extensions-3. | 25 KB     | ########## | 100% 
rsa-4.7              | 27 KB     | ########## | 100% 
pyopenssl-20.0.1     | 48 KB     | ########## | 100% 
libgfortran-ng-9.3.0 | 22 KB     | ########## | 100% 
six-1.15.0           | 14 KB     | ########## | 100% 
bleach-3.3.0         | 111 KB    | ########## | 100% 
partd-1.1.0          | 17 KB

In [7]:
coiled.create_cluster_configuration(name=env_name, software=env_name, worker_cpu=4, worker_memory="16 GiB")

In [8]:
%%time
cluster = coiled.Cluster(configuration=f"aktech/{env_name}", n_workers=50, worker_options={
    "nthreads": 4,
    "env": {
#         "OMP_NUM_THREADS": "2",
#         "OPENBLAS_NUM_THREADS": "1",
    },
    "memory_limit": "16 GB"
})

# cluster = coiled.Cluster(name="aktech-d9b704a3-c")
client = Client(cluster, timeout=30)
print('Dashboard:', client.dashboard_link)

Creating Cluster. This takes about a minute ...Checking environment images
Valid environment image found
Dashboard: http://ec2-18-191-29-206.us-east-2.compute.amazonaws.com:8787
CPU times: user 176 ms, sys: 39.5 ms, total: 215 ms
Wall time: 1min 35s


In [None]:
# def set_config(key, value):
#     dask.config.set({key: value})
#     return dask.config.get(key)

# client.run_on_scheduler(set_config, "distributed.comm.timeouts.connect", "60s")
# client.run(set_config, "distributed.comm.timeouts.connect", "60s")

In [30]:
store = fsspec.get_mapper('gs://ag1000g-release/phase2.AR1/variation/main/zarr/all/ag1000g.phase2.ar1')
callset_snps = zarr.open_consolidated(store=store)
gt = callset_snps['2R/calldata/GT']



In [38]:
gt_da = da.from_zarr(gt)
x = gt_da[:, :, 1].T
x = x.rechunk((-1, 100000))
x
z = x[:100]
z

Unnamed: 0,Array,Chunk
Bytes,2.48 GB,10.00 MB
Shape,"(100, 24767689)","(100, 100000)"
Count,5332 Tasks,248 Chunks
Type,int8,numpy.ndarray
"Array Chunk Bytes 2.48 GB 10.00 MB Shape (100, 24767689) (100, 100000) Count 5332 Tasks 248 Chunks Type int8 numpy.ndarray",24767689  100,

Unnamed: 0,Array,Chunk
Bytes,2.48 GB,10.00 MB
Shape,"(100, 24767689)","(100, 100000)"
Count,5332 Tasks,248 Chunks
Type,int8,numpy.ndarray


In [39]:
out_blockwise = pairwise_distance_blockwise(z, metric="correlation")
out_blocks = pairwise_distance_blocks(z, metric="correlation")

In [40]:
%%time
with performance_report(filename="dask-report-pairwise_distance_blockwise.html"), get_task_stream(plot='save', filename="task-stream-pairwise_distance_blockwise.html") as ts:
    r1 = out_blockwise.compute()

CPU times: user 1.11 s, sys: 92 ms, total: 1.21 s
Wall time: 1min 6s


In [41]:
%%time
with performance_report(filename="dask-report-pairwise_distance_blocks.html"), get_task_stream(plot='save', filename="task-stream-pairwise_distance_blocks.html") as ts:
    r2 = out_blocks.compute()

CPU times: user 1.44 s, sys: 78.6 ms, total: 1.52 s
Wall time: 57.7 s
