# Data Processing Pipeline for Spatial LD

## Imports

In [1]:
import re
from pathlib import Path

import polars as pl
from spatial import (
    linear_transect,
    merge_parquet_files,
    process_raw_ind_data_and_write_parquet,
    simplify_and_mutate_tree_sequences,
    spatially_sample_individuals_join_data,
)

## Input/Output Paths

In [2]:
MU = 1e-8

DATA_DIR = Path("~/simulation-outputs/spatial-ld-final").expanduser()
IN_RAW_TREES = list((DATA_DIR / "trees").glob("*.trees.tsz"))
RAW_TREES = {
    # Run ids are determined by the name of the tree sequence file, set by the compute cluster
    re.search(r"trees/output-([0-9]+-[0-9]+).trees.tsz", str(p)).group(1): p
    for p in IN_RAW_TREES
}
assert len(RAW_TREES) == len(IN_RAW_TREES), "run ids not unique"
RUN_IDS = sorted(RAW_TREES, key=lambda k: tuple(map(int, k.split("-"))))

RAW_IND_DF = DATA_DIR / "individuals-raw.parquet"

SIMPLIFIED_TREES_DIR = DATA_DIR / "trees-simplified"
SIMPLIFIED_TREES = {k: SIMPLIFIED_TREES_DIR / v.name for k, v in RAW_TREES.items()}
SAMPLED_IND_DF = DATA_DIR / "individuals-sampled.parquet"

PAIRWISE_STATS_TMP = [
    DATA_DIR / "div-dist-tmp" / f"{run_id}-div.parquet" for run_id in RUN_IDS
]
PAIRWISE_STATS_DF = DATA_DIR / "pairwise-stats.parquet"

## SLiM Simulations

I ran the SLiM simulations on our compute cluster. The runtimes of our simulations are < 1day (TODO: I have timing info)

The model can be seen [here](https://github.com/lkirk/tskit-ld/blob/main/spatial-ld/sim/docker/main-spatial.slim). (TODO: freeze url to version used)

The script for parameter generation is [here](https://github.com/lkirk/tskit-ld/blob/main/spatial-ld/sim/submission/gen-params). (TODO: freeze url to version used) It generates parameter files that look like [this](https://github.com/lkirk/tskit-ld/blob/main/spatial-ld/sim/submission/params/0.json). (TODO: freeze url to version used)

All tree sequences store model parameters in the metadata. (all info needed for analysis is contained in the tree sequence)

We begin our analysis with a directory of tree sequences.

## Read raw sample data

First, we read in the sample data from each tree sequence. We determine the time at which each sample was recorded and store the sample location, sampling time and associated metadata

A dataframe is produced per tree sequence, then appended to a merged dataframe. For ~4000 tree sequences this takes about 15G of memory and about 35 minutes (on 17 cores)

Finally, we save a merged DF in the form of a parquet file. This file also contains metadata from each run, recorded by SLiM (includes model parameters and SLiM settings). These metadata are dictionary values keyed off of the run id (determined in the input/output paths section)

In [3]:
# !rm {RAW_IND_DF}  # uncomment to rerun step
if not RAW_IND_DF.exists():
    process_raw_ind_data_and_write_parquet(RAW_TREES, RAW_IND_DF, n_jobs=17)

[Parallel(n_jobs=17)]: Using backend LokyBackend with 17 concurrent workers.
[Parallel(n_jobs=17)]: Done   7 tasks      | elapsed:   11.1s
[Parallel(n_jobs=17)]: Done  16 tasks      | elapsed:   13.5s
[Parallel(n_jobs=17)]: Done  27 tasks      | elapsed:   20.7s
[Parallel(n_jobs=17)]: Done  38 tasks      | elapsed:   29.4s
[Parallel(n_jobs=17)]: Done  51 tasks      | elapsed:   34.4s
[Parallel(n_jobs=17)]: Done  64 tasks      | elapsed:   41.8s
[Parallel(n_jobs=17)]: Done  79 tasks      | elapsed:   51.3s
[Parallel(n_jobs=17)]: Done  94 tasks      | elapsed:   59.3s
[Parallel(n_jobs=17)]: Done 111 tasks      | elapsed:  1.1min
[Parallel(n_jobs=17)]: Done 128 tasks      | elapsed:  1.3min
[Parallel(n_jobs=17)]: Done 147 tasks      | elapsed:  1.5min
[Parallel(n_jobs=17)]: Done 166 tasks      | elapsed:  1.6min
[Parallel(n_jobs=17)]: Done 187 tasks      | elapsed:  1.8min
[Parallel(n_jobs=17)]: Done 208 tasks      | elapsed:  2.0min
[Parallel(n_jobs=17)]: Done 231 tasks      | elapsed:  

## Spatial sampling

Steps:
1. Create a spatial transect, sampling individuals by their location in this transect, labelling the individuals in a dataframe column named `sample_group`.
1. Sample 50 individuals from each sample group and store these in a smaller subsetted dataframe (writing with associated metadata as in the above step).
1. Simplify our tree sequences, only keeping the samples that we're interested
1. Compute the generation time by taking the mean parent age during the time window over which we're sampling (first sample time:last sample time)
1. Drop neutral mutations on the tree sequence with msprime, scaling the mutation rate $(\mu=10^{-8})$ by the mean generation time $(\mu / \overline{t}_{\text{gen}})$

Outputs:
1. Dataframe with samples and their sample group + run metadata
1. Simplified and mutated tree sequences 

Notes:
* spatial transect contains adjacent 5x5 unit squares along the middle of the space, with 5 units of buffer on all sides
* spatially sampling and joining takes about 30G of memory and runs in seconds (20 cores)

In [4]:
# !rm -r {SIMPLIFIED_TREES_DIR} {SAMPLED_IND_DF } # uncomment to rerun step
if not (SIMPLIFIED_TREES_DIR.exists() and SAMPLED_IND_DF.exists()):
    SIMPLIFIED_TREES_DIR.mkdir()
    sampled = spatially_sample_individuals_join_data(
        RAW_IND_DF, n_ind=50, sample_group_fn=linear_transect(5, 5, 5)
    )
    simplify_and_mutate_tree_sequences(
        RAW_TREES,
        SIMPLIFIED_TREES,
        SAMPLED_IND_DF,
        sampled,
        RAW_IND_DF,
        mu=MU,
        n_jobs=17,
    )

[Parallel(n_jobs=17)]: Using backend LokyBackend with 17 concurrent workers.
[Parallel(n_jobs=17)]: Done   7 tasks      | elapsed:   29.5s
[Parallel(n_jobs=17)]: Done  16 tasks      | elapsed:   35.3s
[Parallel(n_jobs=17)]: Done  27 tasks      | elapsed:   56.8s
[Parallel(n_jobs=17)]: Done  38 tasks      | elapsed:  1.3min
[Parallel(n_jobs=17)]: Done  51 tasks      | elapsed:  1.4min
[Parallel(n_jobs=17)]: Done  64 tasks      | elapsed:  1.8min
[Parallel(n_jobs=17)]: Done  79 tasks      | elapsed:  2.2min
[Parallel(n_jobs=17)]: Done  94 tasks      | elapsed:  2.6min
[Parallel(n_jobs=17)]: Done 111 tasks      | elapsed:  3.0min
[Parallel(n_jobs=17)]: Done 128 tasks      | elapsed:  3.4min
[Parallel(n_jobs=17)]: Done 147 tasks      | elapsed:  3.8min
[Parallel(n_jobs=17)]: Done 166 tasks      | elapsed:  4.2min
[Parallel(n_jobs=17)]: Done 187 tasks      | elapsed:  4.7min
[Parallel(n_jobs=17)]: Done 208 tasks      | elapsed:  5.1min
[Parallel(n_jobs=17)]: Done 231 tasks      | elapsed:  

## Divergence and Geographic Distance

I sent the jobs to compute divergence and geographic distance to our compute cluster. Each simulation takes about 30 minutes to process.

I'm using `tskit.divergence` to compute the pairwise genetic divergence between all pairs of individuals (combinations w/ replacement) in a given simulation run.

Next, I compute the euclidian distance between each of the pairs of individuals specified.

In this step, I'm loading the dataframes and concatenating them, deleting the originals (TODO: delete files). They are too large to be concatenated in memory, so we do it iteratively, in parallel (about a minute).

In [4]:
# !rm {PAIRWISE_STATS_DF}  # uncomment to rerun step
if not PAIRWISE_STATS_DF.exists():
    merge_parquet_files(
        PAIRWISE_STATS_TMP, PAIRWISE_STATS_DF, n_jobs=17, metadata_from=RAW_IND_DF
    )

[Parallel(n_jobs=17)]: Using backend LokyBackend with 17 concurrent workers.
[Parallel(n_jobs=17)]: Done   7 tasks      | elapsed:    0.5s
[Parallel(n_jobs=17)]: Done  16 tasks      | elapsed:    0.7s
[Parallel(n_jobs=17)]: Done  27 tasks      | elapsed:    0.8s
[Parallel(n_jobs=17)]: Done  38 tasks      | elapsed:    0.9s
[Parallel(n_jobs=17)]: Done  51 tasks      | elapsed:    1.1s
[Parallel(n_jobs=17)]: Done  64 tasks      | elapsed:    1.3s
[Parallel(n_jobs=17)]: Done  79 tasks      | elapsed:    1.5s
[Parallel(n_jobs=17)]: Done  94 tasks      | elapsed:    1.7s
[Parallel(n_jobs=17)]: Done 111 tasks      | elapsed:    1.9s
[Parallel(n_jobs=17)]: Done 128 tasks      | elapsed:    2.1s
[Parallel(n_jobs=17)]: Done 147 tasks      | elapsed:    2.4s
[Parallel(n_jobs=17)]: Done 166 tasks      | elapsed:    2.6s
[Parallel(n_jobs=17)]: Done 187 tasks      | elapsed:    2.9s
[Parallel(n_jobs=17)]: Done 208 tasks      | elapsed:    3.2s
[Parallel(n_jobs=17)]: Done 231 tasks      | elapsed:  