In [1]:
import numpy as np
import matplotlib.pyplot as plt
import genetools
import seaborn as sns

sns.set_style("dark")

In [2]:
import pandas as pd

In [3]:
import dask
import dask.dataframe as dd

In [4]:
import os

In [5]:
from malid import config

Raise worker connection timeouts (see other dask notebooks):

In [6]:
import distributed

# These only seem to be picked up by scheduler, not by individual workers

dask.config.set(
    {
        "distributed.comm.timeouts.tcp": "120s",
        "distributed.comm.timeouts.connect": "120s",
        "distributed.comm.retry.count": 5,
    }
)

<dask.config.set at 0x7f8deaf1aa00>

In [7]:
# These will be picked up by individual workers

with open(os.path.expandvars("$HOME/.config/dask/distributed.yaml"), "w") as w:
    w.write(
        """distributed:
  comm:
    retry:
      count: 5
    timeouts:
      connect: 120s          # time before connecting fails
      tcp: 120s              # time before calling an unresponsive connection dead
    """
    )

In [8]:
from dask.distributed import Client

# multi-processing backend
# if already opened from another notebook, see https://stackoverflow.com/questions/60115736/dask-how-to-connect-to-running-cluster-scheduler-and-access-total-occupancy
client = Client(
    scheduler_port=config.dask_scheduler_port,
    dashboard_address=config.dask_dashboard_address,
    n_workers=config.dask_n_workers,
    processes=True,
    threads_per_worker=8,
    memory_limit="auto",
    worker_dashboard_address=":0",  # start worker dashboards on random ports
)
display(client)
# for debugging: client.restart()

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:61093/status,

0,1
Dashboard: http://127.0.0.1:61093/status,Workers: 8
Total threads: 64,Total memory: 503.56 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:61094,Workers: 8
Dashboard: http://127.0.0.1:61093/status,Total threads: 64
Started: Just now,Total memory: 503.56 GiB

0,1
Comm: tcp://127.0.0.1:44427,Total threads: 8
Dashboard: http://127.0.0.1:32789/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:44259,
Local directory: /tmp/dask-worker-space/worker-469vqfdl,Local directory: /tmp/dask-worker-space/worker-469vqfdl

0,1
Comm: tcp://127.0.0.1:36155,Total threads: 8
Dashboard: http://127.0.0.1:35205/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:38193,
Local directory: /tmp/dask-worker-space/worker-60d40pf6,Local directory: /tmp/dask-worker-space/worker-60d40pf6

0,1
Comm: tcp://127.0.0.1:42445,Total threads: 8
Dashboard: http://127.0.0.1:33847/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:35039,
Local directory: /tmp/dask-worker-space/worker-za6i5wx4,Local directory: /tmp/dask-worker-space/worker-za6i5wx4

0,1
Comm: tcp://127.0.0.1:37453,Total threads: 8
Dashboard: http://127.0.0.1:33923/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:45713,
Local directory: /tmp/dask-worker-space/worker-7do002qg,Local directory: /tmp/dask-worker-space/worker-7do002qg

0,1
Comm: tcp://127.0.0.1:33757,Total threads: 8
Dashboard: http://127.0.0.1:43199/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:44951,
Local directory: /tmp/dask-worker-space/worker-7xqxiyll,Local directory: /tmp/dask-worker-space/worker-7xqxiyll

0,1
Comm: tcp://127.0.0.1:36147,Total threads: 8
Dashboard: http://127.0.0.1:33845/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:43415,
Local directory: /tmp/dask-worker-space/worker-4hswv5je,Local directory: /tmp/dask-worker-space/worker-4hswv5je

0,1
Comm: tcp://127.0.0.1:41735,Total threads: 8
Dashboard: http://127.0.0.1:38477/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:43721,
Local directory: /tmp/dask-worker-space/worker-7k7kn_uh,Local directory: /tmp/dask-worker-space/worker-7k7kn_uh

0,1
Comm: tcp://127.0.0.1:34571,Total threads: 8
Dashboard: http://127.0.0.1:46431/status,Memory: 62.95 GiB
Nanny: tcp://127.0.0.1:39779,
Local directory: /tmp/dask-worker-space/worker-yq1md11l,Local directory: /tmp/dask-worker-space/worker-yq1md11l


In [9]:
desired_cols = [
    "participant_label",
    "specimen_label",
    "disease",
    "disease_subtype",
    "specimen_time_point",
    "participant_age",
    "participant_description",
]

In [10]:
debug_filters = None
# debug_filters = [("participant_label", "==", "BFI-0007450")]

In [11]:
# Don't use fastparquet, because it changes specimen labels like M54-049 to 2049-01-01 00:00:54 -- i.e. it coerces partition names to numbers or dates
df = dd.read_parquet(
    config.paths.sequences,
    columns=desired_cols,
    filters=debug_filters,
    engine="pyarrow",
)

In [12]:
df

Unnamed: 0_level_0,participant_label,specimen_label,disease,disease_subtype,specimen_time_point,participant_age,participant_description
npartitions=2545,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,category[known],category[known],category[unknown],category[unknown],object,object,object
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [13]:
# df = df.drop_duplicates()
# display(df)
# dask.visualize(df)
# this has an aggregation step

In [14]:
# use map_partitions to avoid agg step that is unnecessary based on our partitioning strategy

metadata_df = df.map_partitions(lambda part: part.drop_duplicates())

In [15]:
metadata_df

Unnamed: 0_level_0,participant_label,specimen_label,disease,disease_subtype,specimen_time_point,participant_age,participant_description
npartitions=2545,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
,category[known],category[known],category[unknown],category[unknown],object,object,object
,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...
,...,...,...,...,...,...,...


In [16]:
# dask.visualize(metadata_df, filename="participant_specimen_metadata.dask_task_graph.pdf")

In [17]:
metadata_df_c = metadata_df.compute()

In [18]:
metadata_df_c

Unnamed: 0,participant_label,specimen_label,disease,disease_subtype,specimen_time_point,participant_age,participant_description
0,BFI-0000234,M124-S014,Healthy/Background,Healthy/Background - HIV Negative,,27,Location: USA
73117,BFI-0000234,M132-S014,Healthy/Background,Healthy/Background - HIV Negative,,27,Location: USA
0,BFI-0000254,M111-S037,HIV,HIV Broad Neutralizing,,48,Location: Tanzania
96939,BFI-0000254,M114-S037,HIV,HIV Broad Neutralizing,,48,Location: Tanzania
0,BFI-0000255,M111-S033,HIV,HIV Broad Neutralizing,,33,Location: Tanzania
...,...,...,...,...,...,...,...
0,towlerton-2022-hiv_1027,towlerton-2022-hiv_015V09002862_CFAR,HIV,HIV - before anti-retroviral therapy,,,
0,towlerton-2022-hiv_1028,towlerton-2022-hiv_015V11001386_CFAR,HIV,HIV - before anti-retroviral therapy,,,
916,towlerton-2022-hiv_1028,towlerton-2022-hiv_015V11002805_CFAR,HIV,HIV - before anti-retroviral therapy,,,
0,towlerton-2022-hiv_1029,towlerton-2022-hiv_015V11001839_CFAR,HIV,HIV - before anti-retroviral therapy,,,


In [19]:
metadata_df_c = metadata_df_c.sort_values(["disease", "participant_label"])
metadata_df_c

Unnamed: 0,participant_label,specimen_label,disease,disease_subtype,specimen_time_point,participant_age,participant_description
0,BFI-0000234,M124-S014,Healthy/Background,Healthy/Background - HIV Negative,,27,Location: USA
73117,BFI-0000234,M132-S014,Healthy/Background,Healthy/Background - HIV Negative,,27,Location: USA
0,BFI-0002850,M124-S042,Healthy/Background,Healthy/Background - HIV Negative,,26,Location: USA
49528,BFI-0002850,M132-S040,Healthy/Background,Healthy/Background - HIV Negative,,26,Location: USA
0,BFI-0002851,M124-S041,Healthy/Background,Healthy/Background - HIV Negative,,27,Location: USA
...,...,...,...,...,...,...,...
0,ramesh-2015-ci_378,ramesh-2015-ci_378,CVID,CVID,,,
0,ramesh-2015-ci_386,ramesh-2015-ci_386,CVID,CVID,,,
0,ramesh-2015-ci_400,ramesh-2015-ci_400,CVID,CVID,,,
0,ramesh-2015-ci_441,ramesh-2015-ci_441,CVID,CVID,,,


In [20]:
# Important sanity check
if metadata_df_c["disease"].isna().any():
    raise ValueError("Some specimens had disease column unset.")

In [21]:
# sanity check: one entry per participant + specimen
assert all(
    metadata_df_c.groupby(["participant_label", "specimen_label"], observed=True).size()
    == 1
)

In [22]:
metadata_df_c.to_csv(
    config.paths.dataset_specific_metadata / "participant_specimen_disease_map.tsv",
    sep="\t",
    index=None,
)

In [23]:
metadata_df_c = pd.read_csv(
    config.paths.dataset_specific_metadata / "participant_specimen_disease_map.tsv",
    sep="\t",
)

In [24]:
metadata_df_c["disease_subtype"].value_counts()

Healthy/Background - CMV-                  481
Healthy/Background - CMV+                  392
T1D - new onset                            143
HIV Non Neutralizing                       105
Healthy/Background - T1D negative          100
                                          ... 
Covid19 - Sero-negative (ICU)                2
Healthy/Background - pediatric               1
T2D - adult                                  1
SLE One aAbs / SLE dsDNA WITH Nephritis      1
COVID-19-HUniv12Oct                          1
Name: disease_subtype, Length: 66, dtype: int64

Confirm HIV patient numbers -- we expect:

```
43	HIV Negative
46	HIV Broad Neutralizing
50	HIV Non Neutralizing
```

In [25]:
# Specimens
metadata_df_c[metadata_df_c["disease"] == "HIV"]["disease_subtype"].astype(
    "category"
).cat.remove_unused_categories().value_counts()

HIV Non Neutralizing                    105
HIV Broad Neutralizing                   92
HIV - before anti-retroviral therapy     61
Name: disease_subtype, dtype: int64

In [26]:
# Patients
metadata_df_c[metadata_df_c["disease"] == "HIV"].groupby(
    "disease_subtype", observed=True
)["participant_label"].nunique()

disease_subtype
HIV - before anti-retroviral therapy    30
HIV Broad Neutralizing                  46
HIV Non Neutralizing                    50
Name: participant_label, dtype: int64

In [27]:
# healthy specimens
metadata_df_c[metadata_df_c["disease"] == "Healthy/Background"][
    "disease_subtype"
].astype("category").cat.remove_unused_categories().value_counts()

Healthy/Background - CMV-              481
Healthy/Background - CMV+              392
Healthy/Background - T1D negative      100
Healthy/Background - TCRBv4-control     88
Healthy/Background - HIV Negative       86
Healthy/Background (children)           46
Healthy/Background - Britanova          39
Healthy/Background - SLE Negative       31
Healthy/Background - CMV Unknown        27
Healthy/Background - CVID negative      22
Healthy/Background - RA negative        13
Healthy/Background - IBD Negative       12
Healthy/Background - Briney              8
Unaffected Control                       6
Healthy/Background - adult               2
Healthy/Background - pediatric           1
Name: disease_subtype, dtype: int64

In [28]:
# healthy patients
metadata_df_c[metadata_df_c["disease"] == "Healthy/Background"].groupby(
    "disease_subtype", observed=True
)["participant_label"].nunique()

disease_subtype
Healthy/Background (children)           46
Healthy/Background - Briney              8
Healthy/Background - Britanova          36
Healthy/Background - CMV Unknown        27
Healthy/Background - CMV+              392
Healthy/Background - CMV-              481
Healthy/Background - CVID negative      22
Healthy/Background - HIV Negative       43
Healthy/Background - IBD Negative       12
Healthy/Background - RA negative        13
Healthy/Background - SLE Negative       24
Healthy/Background - T1D negative       25
Healthy/Background - TCRBv4-control     88
Healthy/Background - adult               2
Healthy/Background - pediatric           1
Unaffected Control                       6
Name: participant_label, dtype: int64

In [29]:
metadata_df_c[metadata_df_c["disease_subtype"] == "Healthy/Background - CMV Unknown"]

Unnamed: 0,participant_label,specimen_label,disease,disease_subtype,specimen_time_point,participant_age,participant_description
114,BFI-0003144,M64-095,Healthy/Background,Healthy/Background - CMV Unknown,,22.0,Healthy Human Control
493,emerson-2017-natgen_train_HIP01298,emerson-2017-natgen_train_HIP01298,Healthy/Background,Healthy/Background - CMV Unknown,,,
652,emerson-2017-natgen_train_HIP08200,emerson-2017-natgen_train_HIP08200,Healthy/Background,Healthy/Background - CMV Unknown,,,
747,emerson-2017-natgen_train_HIP10846,emerson-2017-natgen_train_HIP10846,Healthy/Background,Healthy/Background - CMV Unknown,,,
937,emerson-2017-natgen_train_HIP13944,emerson-2017-natgen_train_HIP13944,Healthy/Background,Healthy/Background - CMV Unknown,,,
945,emerson-2017-natgen_train_HIP13961,emerson-2017-natgen_train_HIP13961,Healthy/Background,Healthy/Background - CMV Unknown,,,
1014,emerson-2017-natgen_train_HIP14134,emerson-2017-natgen_train_HIP14134,Healthy/Background,Healthy/Background - CMV Unknown,,,
1016,emerson-2017-natgen_train_HIP14138,emerson-2017-natgen_train_HIP14138,Healthy/Background,Healthy/Background - CMV Unknown,,,
1019,emerson-2017-natgen_train_HIP14143,emerson-2017-natgen_train_HIP14143,Healthy/Background,Healthy/Background - CMV Unknown,,,
1023,emerson-2017-natgen_train_HIP14156,emerson-2017-natgen_train_HIP14156,Healthy/Background,Healthy/Background - CMV Unknown,,,


In [30]:
# covid specimens
metadata_df_c[metadata_df_c["disease"] == "Covid19"]["disease_subtype"].astype(
    "category"
).cat.remove_unused_categories().value_counts()

Covid19 - Admit                             52
COVID-19-NIH/NIAID - Hospitalized           39
COVID-19-HUniv12Oct - Hospitalized          26
Covid19 - Sero-positive (ICU)               18
Covid19 - ICU                               18
Covid19 - Shomuradova - mild                10
COVID-19-ISB                                 9
Covid19 - Shomuradova - moderate/severe      7
COVID-19-NIH/NIAID - Hospitalized - ICU      7
Covid19 - Kim                                7
Covid19 - Sero-positive (Admit)              6
COVID-19-HUniv12Oct - Hospitalized - ICU     6
Covid19 - Convalescence                      5
Covid19 - Acute 2                            5
Covid19 - Acute 1                            5
Covid19 - Sero-negative (Admit)              3
Covid19 - Sero-negative (ICU)                2
COVID-19-HUniv12Oct                          1
Name: disease_subtype, dtype: int64

In [31]:
# covid patients
metadata_df_c[metadata_df_c["disease"] == "Covid19"].groupby(
    "disease_subtype", observed=True
)["participant_label"].nunique()

disease_subtype
COVID-19-HUniv12Oct                          1
COVID-19-HUniv12Oct - Hospitalized          26
COVID-19-HUniv12Oct - Hospitalized - ICU     6
COVID-19-ISB                                 9
COVID-19-NIH/NIAID - Hospitalized           39
COVID-19-NIH/NIAID - Hospitalized - ICU      7
Covid19 - Acute 1                            5
Covid19 - Acute 2                            5
Covid19 - Admit                             41
Covid19 - Convalescence                      5
Covid19 - ICU                               15
Covid19 - Kim                                7
Covid19 - Sero-negative (Admit)              1
Covid19 - Sero-negative (ICU)                1
Covid19 - Sero-positive (Admit)              3
Covid19 - Sero-positive (ICU)                7
Covid19 - Shomuradova - mild                10
Covid19 - Shomuradova - moderate/severe      7
Name: participant_label, dtype: int64

In [32]:
client.shutdown()

{"message": "Batched Comm Closed <TCP (closed) Scheduler connection to worker local=tcp://127.0.0.1:61094 remote=tcp://127.0.0.1:39332>", "time": "2024-04-01T15:32:27.849687", "exc_info": "Traceback (most recent call last):\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/batched.py\", line 115, in _background_send\n    nbytes = yield coro\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/tornado/gen.py\", line 767, in run\n    value = future.result()\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/comm/tcp.py\", line 269, in write\n    raise CommClosedError()\ndistributed.comm.core.CommClosedError"}


{"message": "Batched Comm Closed <TCP (closed) Scheduler connection to worker local=tcp://127.0.0.1:61094 remote=tcp://127.0.0.1:39330>", "time": "2024-04-01T15:32:27.913139", "exc_info": "Traceback (most recent call last):\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/batched.py\", line 115, in _background_send\n    nbytes = yield coro\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/tornado/gen.py\", line 767, in run\n    value = future.result()\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/comm/tcp.py\", line 269, in write\n    raise CommClosedError()\ndistributed.comm.core.CommClosedError"}


{"message": "Batched Comm Closed <TCP (closed) Scheduler connection to worker local=tcp://127.0.0.1:61094 remote=tcp://127.0.0.1:39338>", "time": "2024-04-01T15:32:27.914454", "exc_info": "Traceback (most recent call last):\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/batched.py\", line 115, in _background_send\n    nbytes = yield coro\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/tornado/gen.py\", line 767, in run\n    value = future.result()\n  File \"/home/maxim/miniconda/envs/cuda-env-py39/lib/python3.9/site-packages/distributed/comm/tcp.py\", line 269, in write\n    raise CommClosedError()\ndistributed.comm.core.CommClosedError"}
