In [1]:
import pathlib
from dataclasses import dataclass
from functools import reduce

import numpy as np
import pandas as pd
import polars as pl
import tqdm.notebook as tqdm

In [2]:
@dataclass
class RandomPhenotype:
    name: str
    features: list[str]
    operator: str

def generate_random_phenotypes(features_df, n_random, seed):
    feature_names = features_df.columns
    results = list()
    names = set()
    np.random.seed(seed)
    while len(results) < n_random:
        operator = np.random.choice(["and", "or", "mul"], size=1).item()
        selection = np.random.choice(feature_names, size=3, replace=False)
        name = f"{operator}_{selection[0]}_{selection[1]}"
        if name in names:
            continue
        names.add(name)
        result = RandomPhenotype(
            name=name,
            features=selection,
            operator=operator
        )
        results.append(result)
    return results

def apply_phenotypes(random_phenotypes, dataframe, check=False):
    return dataframe.select([apply_phenotype(p) for p in random_phenotypes])

def apply_phenotype(phenotype):
    match phenotype.operator:
        case "and":
            return pl.min_horizontal(phenotype.features).alias(phenotype.name)
        case "or":
            return pl.max_horizontal(phenotype.features).alias(phenotype.name)
        case "mul":
            return reduce(lambda x, y: x * y, map(pl.col, phenotype.features)).alias(phenotype.name)
        case "add":
            return pl.sum_horizontal(phenotype.features).alias(phenotype.name)

In [3]:
n_random = 1000

original_phenotypes_df = pl.read_csv("data/pheno/original.tsv", separator="\t").drop("FID", "IID")
random_phenotypes = generate_random_phenotypes(original_phenotypes_df, n_random, seed=0)
random_phenotypes_df = apply_phenotypes(random_phenotypes, original_phenotypes_df)
output_path = f"data/pheno/rand_original.tsv"
random_phenotypes_df.write_csv(output_path, separator="\t")

anon_paths = sorted(pathlib.Path("data/pheno").glob("anon_*.tsv"))

for anon_path in tqdm.tqdm(anon_paths):
    anon_phenotypes_df = (
        pl.read_csv(anon_path, separator="\t")
        .drop("n_occurrences")
    )
    anon_random_phenotypes_df = apply_phenotypes(random_phenotypes, anon_phenotypes_df)
    output_path = f"data/pheno/rand_{anon_path.stem}.tsv"
    anon_random_phenotypes_df.write_csv(output_path, separator="\t")

  0%|          | 0/4 [00:00<?, ?it/s]