# Case ascertainment across linked data sources and deprivation distribution in Our Future Health

## Purpose
This notebook extracts and harmonises disease case information for selected phenotypes in the Our Future Health (OFH) cohort across multiple data sources, including self-reported questionnaire data, linked hospital inpatient and outpatient records, death registrations, and cancer registry data. It also derives the distribution of area-level deprivation among OFH participants.

## Outputs
- Intermediate metadata and SQL query files used to extract diagnosis and medication fields from linked datasets.
- Aggregated case counts for selected phenotypes derived from:
  - Self-reported questionnaire diagnoses
  - NHS England admitted patient care (HES inpatient) records
  - NHS England outpatient records
  - ONS death registrations
  - NHS England cancer registry (patient tumour) data
- Aggregated medication-use case counts for selected conditions.
- Distribution of Index of Multiple Deprivation (IMD) quintiles among OFH participants.
- In-memory summary tables used for cross-cohort comparison and visualisation.

## Relationship to manuscript
Outputs from this notebook are used to generate **Figure 1** in the main text and to populate **Supplementary Table 1** (*Case–control distribution of 19 selected phenotypes and their equivalents across Our Future Health, All of Us, FinnGen, Estonian Biobank, and UK Biobank*) and **Supplementary Table 2** (*Example number of cases in each dataset for OFH participants*).

## Data and access notes
Analyses use restricted Our Future Health data accessed within the OFH Trusted Research Environment under approved study permissions. Linked health-record sources include NHS England inpatient and outpatient data, death registrations, and cancer registry records. All outputs are aggregated, non-disclosive summary statistics and comply with OFH Safe Output requirements.

## Notes
Case definitions are phenotype-specific and vary by data source. Medication-based endpoints are derived from self-reported regular medication use and are intended as complementary indicators of treatment exposure. Area-level deprivation is derived from linked inpatient records, with participants assigned to deprivation quintiles using standard Index of Multiple Deprivation groupings.


## Setup env

In [None]:
# Import packages
import dxpy
import shlex
import subprocess
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

# Import phenofhy
import phenofhy

### Initialize Spark

In [None]:
spark = SparkSession.builder \
    .appName("Phenotype Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.kryoserializer.buffer.max", "128") \
    .getOrCreate()

### Load fields

In [None]:
files = [
    "table_s2_nhse_inpat_diag_icd_fields.csv",
    "table_s2_event_case_count_phenotypes.csv",
    "table_s2_questionnaire_diag_fields.csv",
    "table_s2_nhse_outpat_diag_icd_fields.csv",
    "table_s2_nhse_engwal_death_icd_fields.csv",
    "table_s2_nhse_eng_canreg_pattumour_icd_fields.csv",
    "table_s2_questionnaire_meds_fields.csv",
    "table_s2_meds_phenotypes.csv",
    "imd_nhse_inpat.csv"
]

phenofhy.utils.download_files([
    (str(phenofhy.utils.find_latest_dx_file_id(f)), f"inputs/{f}")
    for f in files
])

In [None]:
pheno_dfs = {f.replace('.csv', ''): pd.read_csv(f'./inputs/{f}') for f in files}

In [None]:
metadata_dfs = phenofhy.load.metadata()

## IMD chart

In [None]:
phenofhy.load.field_list(
    input_file="inputs/imd_nhse_inpat.csv", 
    output_file="outputs/intermediate/imd_nhse_inpat_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/imd_nhse_inpat_metadata.csv",
    output_file="outputs/raw/imd_nhse_inpat_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

raw_imd_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/imd_nhse_inpat_raw_values_query.sql"
)

In [None]:
# keep first row for each participant
df_unique = raw_imd_df.drop_duplicates(subset="nhse_eng_inpat.pid", keep="first")

# normalize the text to lowercase
df_unique["nhse_eng_inpat.imd04_decile"] = (
    df_unique["nhse_eng_inpat.imd04_decile"]
    .str.strip()   # remove leading/trailing spaces
    .str.lower()   # make everything lowercase
)

# now recompute counts
decile_dist = df_unique["nhse_eng_inpat.imd04_decile"].value_counts(normalize=True, dropna=False)
print("\nProportions:\n", decile_dist)

In [None]:
import re, numpy as np, pandas as pd

df = raw_imd_df.drop_duplicates(subset="nhse_eng_inpat.pid", keep="first").copy()

def pct_to_decile(s):
    if pd.isna(s): 
        return np.nan
    s = str(s).strip().lower()
    nums = [int(x) for x in re.findall(r'\d{1,2}', s)]
    if not nums:
        return np.nan
    center = np.mean(nums)                    # midpoint of the band (e.g. 10-20 -> 15)
    # if label mentions 'least' or 'less', interpret percentile relative to LEAST-deprived end
    if 'least' in s or 'less' in s:
        return int(11 - np.ceil(center / 10))   # e.g. 10% -> decile 10, 10-20 -> decile 9
    return int(np.ceil(center / 10))           # e.g. 10% -> decile 1, 10-20 -> decile 2

dec = df["nhse_eng_inpat.imd04_decile"].apply(pct_to_decile).astype("Float64")
df["imd_quintile"] = (((dec - 1) // 2) + 1).astype("Int64")   # 1 = most deprived ... 5 = least

# If you have a numeric score and want to fill remaining NaNs from distributional quintiles:
score_col = "nhse_eng_inpat.imd04_score"
need = df["imd_quintile"].isna()
if score_col in df.columns and need.any():
    s = pd.to_numeric(df.loc[need, score_col], errors="coerce")
    try:
        df.loc[need, "imd_quintile"] = pd.qcut(s, 5, labels=[1,2,3,4,5], duplicates="drop").astype("Int64")
    except ValueError:
        df.loc[need, "imd_quintile"] = pd.cut(s, 5, labels=[1,2,3,4,5]).astype("Int64")

print(df["imd_quintile"].value_counts(normalize=True, dropna=False).sort_index())

## TableS1

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_nhse_inpat_diag_icd_fields.csv", 
    output_file="outputs/intermediate/nhse_inpat_diag_fields_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/nhse_inpat_diag_fields_metadata.csv",
    output_file="outputs/raw/nhse_inpat_diag_fields_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

raw_nhse_inpat_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/nhse_inpat_diag_fields_raw_values_query.sql"
)

trait_to_pids, trait_counts = phenofhy.icd.match_icd_traits(
    raw_df=raw_nhse_inpat_df,
    traits_and_codes=pheno_dfs['table_s2_event_case_count_phenotypes'][["trait", "ICD_code"]],
    pid_col="nhse_eng_inpat.pid",
    prefix_if_len_at_most=3,          # 'E10' -> prefix match
    return_occurrence_counts=True,
    use_pyarrow_strings=True,         # faster & smaller if pandas>=2.0 + pyarrow installed
    chunksize=500_000                 # tune: 250k–1M depending on instance type
)

# Strict case definition (primary diagnosis field only)
trait_counts

##### Get sex-specific prostate and breast cancer diagnoses

- Update parameters below to obtain results for TableS1

In [None]:
trait_to_pids, trait_counts = phenofhy.icd.match_icd_traits(
    raw_df=raw_nhse_inpat_df.loc[(raw_nhse_inpat_df['participant.demog_sex_1_1'] == 2.0) | # 2.0 = female, 1.0 = male
                                 (raw_nhse_inpat_df['participant.demog_sex_2_1'] == 2.0)],
    traits_and_codes=pheno_dfs['event_case_count_phenotypes'][["trait", "ICD_code"]],
    pid_col="nhse_eng_inpat.pid",
    prefix_if_len_at_most=3,          # 'E10' -> prefix match
    return_occurrence_counts=True,
    primary_only=False,    # restrict to diag_4_1
    use_pyarrow_strings=True,         # faster & smaller if pandas>=2.0 + pyarrow installed
    chunksize=500_000                 # tune: 250k–1M depending on RAM
)

##### Get mutually exclusive Type 1 and Type 2 diabetes case counts

In [None]:
# 1) Build a tiny traits table for just E11 and E10
diab_traits = pd.DataFrame({
    "trait": ["Type 2 diabetes", "Type 1 diabetes"],
    "ICD_code": ["E11", "E10"],   # ≤3 chars → prefix match (E11* / E10*)
})

# 2) Get participant sets per trait (uses all diag fields by default)
trait_to_pids, _ = phenofhy.icd.match_icd_traits(
    raw_df=raw_nhse_inpat_df,
    traits_and_codes=diab_traits,
    pid_col="nhse_eng_inpat.pid",
    prefix_if_len_at_most=3,      # E11/E10 behave as prefixes
    primary_only=False,           # set True to restrict to primary diagnosis
    chunksize=500_000
)

p_e11 = trait_to_pids["Type 2 diabetes"]
p_e10 = trait_to_pids["Type 1 diabetes"]

# 3) Set algebra
e11_only = p_e11 - p_e10
e10_only = p_e10 - p_e11
both     = p_e11 & p_e10
either   = p_e11 | p_e10

# 4) Counts
summary = pd.DataFrame({
    "group": ["E11 only", "E10 only", "Both E10 & E11", "Either E10 or E11"],
    "n":     [len(e11_only), len(e10_only), len(both), len(either)]
})
print(summary)

## TableS2

#### questionnaire diagnosis cases

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_questionnaire_diag_fields.csv", 
    output_file="outputs/intermediate/questionnaire_diag_fields_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/questionnaire_diag_fields_metadata.csv",
    output_file="outputs/raw/questionnaire_diag_fields_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

raw_questionnaire_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/questionnaire_diag_fields_raw_values_query.sql"
)

questionnaire_df = phenofhy.process.participant_fields(raw_questionnaire_df)
questionnaire_df = phenofhy.process.questionnaire_fields(questionnaire_df, derive=False)

In [None]:
questionnaire_prev = phenofhy.calculate.prevalence(
    df=questionnaire_df,
    codings=metadata_dfs["codings"],
    traits=[
    (lambda t: f"questionnaire.{t.lower()}" if t.startswith('DIAG_') else (f"participant.pid" if t == 'participant_id' else f"participant.{t.lower()}"))(t) 
    for t in list(pheno_dfs['table_s2_questionnaire_diag_fields']['coding_name'])
]
)

#### HES admitted patient care cases

- see Table S1 above

#### ONS death registrations

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_nhse_engwal_death_icd_fields.csv", 
    output_file="outputs/intermediate/nhse_engwal_death_icd_fields_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/nhse_engwal_death_icd_fields_metadata.csv",
    output_file="outputs/raw/nhse_engwal_death_icd_fields_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

raw_nhse_engwal_death_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/nhse_engwal_death_icd_fields_raw_values_query.sql")

trait_to_pids, trait_counts = phenofhy.icd.match_icd_traits(
    raw_df=raw_nhse_engwal_death_df,
    traits_and_codes=pheno_dfs["table_s2_event_case_count_phenotypes"][["trait","ICD_code"]],
    diag_cols=diag_cols,                             # <-- key change
    pid_col="nhse_engwal_deaths.pid",
    prefix_if_len_at_most=3,
    return_occurrence_counts=True,
    use_pyarrow_strings=True,
    chunksize=500_000
)

In [None]:
trait_counts.sort_values(by='trait', ascending=True)[['trait', 'participant_count']]

#### cancer registry patient tumour cases

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_nhse_eng_canreg_pattumour_icd_fields.csv",
    output_file="outputs/intermediate/nhse_eng_canreg_pattumour_icd_fields_metadata.csv",
)
phenofhy.extract.fields(
    input_file="outputs/intermediate/nhse_eng_canreg_pattumour_icd_fields_metadata.csv",
    output_file="outputs/raw/nhse_eng_canreg_pattumour_icd_fields_raw_values_query.sql",
    cohort_key="FULL_SAMPLE_ID",
    sql_only=True,
)

raw_nhse_eng_canreg_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/nhse_eng_canreg_pattumour_icd_fields_raw_values_query.sql"
)

trait_to_pids, trait_counts = phenofhy.icd.match_icd_traits_any(
    raw_nhse_eng_canreg_df,
    pheno_dfs["table_s2_event_case_count_phenotypes"][["trait", "ICD_code"]],
    pid_col="nhse_eng_canreg_pattumour.pid",
    diag_prefix="nhse_eng_canreg_pattumour.site",  # registry diagnosis columns
    prefix_if_len_at_most=3,
    return_occurrence_counts=True
)

trait_counts

#### questionnaire medication cases 

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_questionnaire_meds_fields.csv", 
    output_file="outputs/intermediate/questionnaire_meds_fields_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/questionnaire_meds_fields_metadata.csv",
    output_file="outputs/raw/questionnaire_meds_fields_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

In [None]:
raw_meds_df = phenofhy.extract.sql_to_pandas(
    "outputs/raw/questionnaire_meds_fields_raw_values_query.sql"
)

In [None]:
meds_df = phenofhy.process.participant_fields(raw_meds_df)
meds_df = phenofhy.process.questionnaire_fields(meds_df, derive=False)

In [None]:
meds_prev_df = phenofhy.calculate.medication_prevalence(
    df=meds_df,
    codings=metadata_dfs['codings'],
    medication_phenotypes=pheno_dfs['table_s2_meds_phenotypes'],
    participant_col="participant.pid",
    denominator="all",                  # or "nonmissing"
    return_what="group",       # <- important
)

##### Malignant neoplasm of breast subset

- Participant count who self-reported malignant neoplasm of breast diagnosis and chemotherapy, hormone therapy, and/or immunotherapy/targeted therapy

In [None]:
# 1) make subset of participants who self-report the diagnosis
subset_mnb = meds_df[meds_df['questionnaire.diag_cancer_1_m'].apply(
    lambda x: (
        isinstance(x, (list, tuple, np.ndarray)) and 4 in x
    ) or (
        isinstance(x, str) and '4' in x and '[' in x and ']' in x
    )
)]

In [None]:
# 2) run prevalence on the subset and request group-level counts
subset_mnb = phenofhy.calculate.medication_prevalence(
    df=subset_df,                                   # <- use subset here
    codings=metadata_dfs['codings'],
    medication_phenotypes=pheno_dfs['table_s2_meds_phenotypes'],
    participant_col="participant.pid",
    denominator="all",     # 'all' will now mean the number in subset_df
    return_what="group",
)

In [None]:
subset_mnb.loc[subset_mnb['trait']=='Malignant neoplasm of breast']

##### Alzheimer's disease subset

In [None]:
# --- Alzheimer’s disease ---
subset_alz = meds_df[meds_df['questionnaire.diag_neuro_1_m'].apply(
    lambda x: (
        isinstance(x, (list, tuple, np.ndarray)) and 3 in x
    ) or (
        isinstance(x, str) and '3' in x and '[' in x and ']' in x
    )
)]

alz_group_df = phenofhy.calculate.medication_prevalence(
    df=subset_alz,
    codings=metadata_dfs['codings'],
    medication_phenotypes=pheno_dfs['table_s2_meds_phenotypes'],
    participant_col="participant.pid",
    denominator="all",
    return_what="group",
)

In [None]:
alz_group_df.loc[alz_group_df['trait']=="Alzheimer's disease"]

##### Type 2 diabetes subset

In [None]:
subset_t2d = meds_df[meds_df['questionnaire.diag_endocr_1_m'].apply(
    lambda x: (
        isinstance(x, (list, tuple, np.ndarray)) and 2 in x
    ) or (
        isinstance(x, str) and '2' in x and '[' in x and ']' in x
    )
)]

t2d_group_df = phenofhy.calculate.medication_prevalence(
    df=subset_t2d,
    codings=metadata_dfs['codings'],
    medication_phenotypes=pheno_dfs['table_s2_meds_phenotypes'],
    participant_col="participant.pid",
    denominator="all",
    return_what="group",
)

In [None]:
t2d_group_df.loc[t2d_group_df['trait']=="Type 2 diabetes"]

#### HES outpatient cases

In [None]:
phenofhy.load.field_list(
    input_file="inputs/table_s2_nhse_outpat_diag_icd_fields.csv", 
    output_file="outputs/intermediate/nhse_outpat_diag_fields_metadata.csv",
)

phenofhy.extract.fields(
    input_file="outputs/intermediate/nhse_outpat_diag_fields_metadata.csv",
    output_file="outputs/raw/nhse_outpat_diag_fields_raw_values_query.sql", 
    cohort_key="FULL_SAMPLE_ID", 
    sql_only=True
)

raw_nhse_outpat_df = phenofhy.extract.sql_to_spark(
    "outputs/raw/nhse_outpat_diag_fields_raw_values_query.sql")

In [None]:
trait_to_pids, summary_df = phenofhy.icd.match_icd_traits_spark(
    raw_nhse_outpat_df,
    pheno_dfs['table_s2_event_case_count_phenotypes'][["trait","ICD_code"]],
    pid_col="nhse_eng_outpat.pid",
    diag_prefix="nhse_eng_outpat.diag_4_",
    prefix_if_len_at_most=4,
    primary_only=False,       # strongly recommended to cut size by ~12x
    return_occurrence_counts=True,
    return_pids=False        # only turn on if you truly need the sets
)

In [None]:
summary_df.sort_values(by='trait', ascending=True)

 ### Uploads

In [None]:
# Upload an entire directory of folders
phenofhy.utils.upload_folders([
    # ("phenofhy/", "applets/phenofhy"),
    ("inputs/", "inputs/phenotypes/pheno_lists/"),

])