In [84]:
import sys
import os
import os.path as op
import datetime
from glob import glob
import numpy as np
import pandas as pd

In [85]:
# Define directory paths
PATHS = {
    "proj": "/mnt/coredata/processing/leads",
}
PATHS["data"] = op.join(PATHS["proj"], "data")
PATHS["freesurfer"] = op.join(PATHS["data"], "freesurfer")
PATHS["metadata"] = op.join(PATHS["proj"], "metadata")
PATHS["raw"] = op.join(PATHS["data"], "raw")
PATHS["processed"] = op.join(PATHS["data"], "processed")

In [86]:
def scrape_raw(raw_dir, verbose=True):
    """Scrape raw directory for all nifti files and parse them.

    Returns a pandas DataFrame with columns:
    - subj: subject ID
    - scan_date: scan date (YYYY-MM-DD)
    - scan_type: scan type (MRI modality or PET tracer)
    - raw_petf: full path to the nifti file in raw
    """
    raw_scans = glob(op.join(raw_dir, "**", "*.nii"), recursive=True)

    output = []
    for scanf in raw_scans:
        subj = _get_subj(scanf, raw_dir)
        scan_date = _get_scan_date(scanf)
        scan_type = _get_scan_type(scanf)
        output.append([subj, scan_date, scan_type, scanf])

    cols = ["subj", "scan_date", "scan_type", "raw_petf"]
    output = pd.DataFrame(output, columns=cols)

    # Add FDG to scan type for LONI files that don't save "FDG" in filename
    output.loc[output["scan_type"].isnull(), "scan_type"] = output.loc[
        output["scan_type"].isnull(), "raw_petf"
    ].apply(lambda x: "FDG" if (op.join(raw_dir, "fdg") in x) else np.nan)

    if verbose:
        if len(output) == 0:
            print("No scans found in raw")
        else:
            print(
                f"Found {len(output)} scans from {output['subj'].nunique()} subjects in {raw_dir}"
            )
    return output


def _get_subj(filepath, raw_dir):
    """Return the subject ID from filepath to the recon'd nifti.

    Parameters
    ----------
    filepath : str
        The filepath to the reconstructed nifti.

    Returns
    -------
    subj : str
        The subject ID parsed from the input file basename.
    """
    try:
        subj = filepath.replace(raw_dir + "/", "").split("/")[1]
        if len(subj) > 0:
            return subj
        else:
            return np.nan
    except IndexError:
        return np.nan


def _get_scan_date(filepath):
    """Return the scan date from filepath to the recon'd nifti.

    Iterates over filepath directories from right to left until it finds
    a filename or directory whose first 10 characters matches the date
    format YYYY-MM-DD.

    Returns np.nan if no scan date is found, otherwise a string like
    'YYYY-MM-DD'.
    """
    for d in filepath.split(op.sep)[::-1]:
        try:
            acqdate = check_dt_fmt(d[:10], raise_error=True)
            return acqdate
        except ValueError:
            pass
    return np.nan


def check_dt_fmt(datestr, raise_error=False):
    """Return datestr if formatted like YYYY-MM-DD.

    If raise_error is True, raise a ValueError if datestr is not
    formatted like YYYY-MM-DD. Otherwise return np.nan.
    """
    try:
        datestr_to_datetime(datestr)
        return datestr
    except ValueError:
        if raise_error:
            raise ValueError(f"{datestr} is not formatted like YYYY-MM-DD")
        else:
            return np.nan


def datestr_to_datetime(datestr):
    """Convert a date string to a datetime object."""
    return datetime.datetime.strptime(datestr, "%Y-%m-%d")


def datetime_to_datestr(dt):
    """Convert a datetime object to a date string."""
    return dt.strftime("%Y-%m-%d")


def _get_scan_type(filepath, scan_type_map_file=None):
    """Parse the filepath and return the scan type."""
    if scan_type_map_file is None:
        scan_type_map_file = op.join(
            PATHS["metadata"], "ssheets", "scan_types_and_tracers.csv"
        )
    scan_type_map = (
        pd.read_csv(scan_type_map_file).set_index("name_in")["name_out"].to_dict()
    )
    basename = op.basename(filepath).lower()
    for k, v in scan_type_map.items():
        if k in basename:
            return v
    return np.nan


def date_diff(date1, date2, abs=False):
    """Return date2 - date1 in days."""
    try:
        diff = (date2 - date1).days
        if abs:
            return np.abs(diff)
        else:
            return diff
    except TypeError:
        return np.nan


def find_closest_mri(
    subj, scan_date, freesurfer_dir, limit_days=365, strict_limit=False
):
    """Return closest MRI date, days from PET, and Freesurfer path.

    Parameters
    ----------
    subj : str
        The subject ID.
    scan_date : str
        Scan date (YYYY-MM-DD) to match the closest MRI scan to.
    freesurfer_dir : str
        Path to the top-level freesurfer directory containing individual
        processed MRI directories like <subj>_<scan_date>.
    limit_days : int
        A warning is raised if no MRI scan is found within limit days
        and np.nan is returned.
    strict_limit : bool
        If True, np.nan is returned if no MRI scan is found within limit days.
    """
    proc_mris = glob(op.join(freesurfer_dir, f"{subj}_*"))
    if len(proc_mris) == 0:
        print(
            f"WARNING: {subj} scan on {scan_date} has no processed MRI scans in {freesurfer_dir}"
        )
        return np.nan, np.nan, np.nan

    proc_mri_dates = [check_dt_fmt(op.basename(p).split("_")[1]) for p in proc_mris]

    days_to_scan = []
    for d in proc_mri_dates:
        days_to_scan.append(
            date_diff(datestr_to_datetime(d), datestr_to_datetime(scan_date), abs=True)
        )
    closest_mri = proc_mris[np.argmin(days_to_scan)]
    closest_mri_date = proc_mri_dates[np.argmin(days_to_scan)]
    min_days = min(days_to_scan)

    if min_days > limit_days:
        print(
            f"WARNING: {subj} scan on {scan_date} has no matching MRI within {limit_days} days"
        )
        if strict_limit:
            return np.nan, np.nan, np.nan

    return closest_mri_date, min_days, closest_mri


def now():
    """Return the current date and time down to seconds."""
    return datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")


def glob_sort_mtime(pattern):
    """Return files matching pattern in most recent modified order.

    Returns
    -------
    files : list of str
        List of files matching pattern, sorted by most recent modified
        (files[0] is the most recently modified).
    """
    files = sorted(glob(pattern), key=op.getmtime, reverse=True)
    return files

In [87]:
# Load the most recent raw_scans df
raw_scans = pd.read_csv(
    glob_sort_mtime(op.join(PATHS["metadata"], "log", f"raw_pet_scans_*.csv"))[0]
)

print(f"raw_scans: {raw_scans.shape}")

raw_scans: (2067, 12)


In [102]:
raw_scans.isna().any(axis=1).where(True)

ValueError: Array conditional must be same shape as self

In [91]:
search_raw_dirs = [
    d
    for d in glob(op.join(PATHS["raw"], "*"))
    if (op.isdir(d) and not (d.split(op.sep)[-1] == "mri"))
]
search_raw_dirs

['/mnt/coredata/processing/leads/data/raw/fbb',
 '/mnt/coredata/processing/leads/data/raw/fdg',
 '/mnt/coredata/processing/leads/data/raw/ftp']

In [173]:
# Scrape the raw directory for all PET niftis
raw_scans = scrape_raw(PATHS["raw"])

# Get paths for the raw nifti files copied into the processed directory
# raw_cp_str = op.join(
#     PATHS["processed"],
#     "{subj}",
#     "{scan_type}_{scan_date}",
#     "{subj}_{scan_type}_{scan_date}.nii",
# )

# Search processed Freesurfer dirs for closest MRI to each PET scan
raw_scans["mri_date"], raw_scans["days_to_mri"], raw_scans["fs_dir"] = zip(
    *raw_scans.apply(
        lambda x: find_closest_mri(x["subj"], x["scan_date"], PATHS["freesurfer"]),
        axis=1,
    )
)

print(f"raw_scans: {raw_scans.shape}")

Found 2069 scans from 619 subjects in /mnt/coredata/processing/leads/data/raw
raw_scans: (2069, 7)


In [174]:
# Add processed MRI directories to raw_scans
proc_dir_old = "/mnt/coredata/Projects/LEADS/data_f7p1/processed"
mri_dirs = []
for idx, scan in raw_scans.iterrows():
    if scan["fs_dir"] is np.nan:
        mri_dirs.append(np.nan)
        continue

    subj = scan["subj"]
    mri_date = scan["fs_dir"].split("_")[1]
    proc_mri_dir = op.join(PATHS["processed"], subj, f"MRI-T1_{mri_date}")

    # Symlink to the old processed MRI directories
    for tp in range(1, 6):
        proc_mri_dir_old = op.join(
            proc_dir_old, subj, f"Timepoint{tp}", f"MRI_T1_{mri_date}"
        )
        if op.isdir(proc_mri_dir_old):
            os.makedirs(op.dirname(proc_mri_dir), exist_ok=True)
            if not op.exists(proc_mri_dir):
                os.symlink(proc_mri_dir_old, proc_mri_dir)
            break
        if tp == 4:
            proc_mri_dir_old = np.nan

    # Add the processed MRI directory to raw_scans["mri_dir"]
    if op.exists(proc_mri_dir):
        mri_dirs.append(proc_mri_dir)
    else:
        mri_dirs.append(np.nan)

raw_scans["mri_dir"] = mri_dirs

In [176]:
# Drop raw_scans rows with missing data, then sort rows.
raw_scans = (
    raw_scans.dropna()
    .sort_values(["subj", "scan_type", "scan_date"])
    .reset_index(drop=True)
)

# Convert days_to_mri to int
raw_scans["days_to_mri"] = raw_scans["days_to_mri"].astype(int)

# Add a visit column to raw_scans, with visit 1 being the earliest date
# for a given subject and scan_type, visit 2 being the next earliest
# date, and so on.
raw_scans["visit"] = raw_scans.groupby(["subj", "scan_type"]).cumcount() + 1
cols = raw_scans.columns.tolist()
cols.insert(cols.index("scan_type") + 1, cols.pop(cols.index("visit")))
raw_scans = raw_scans[cols]

print(f"raw_scans: {raw_scans.shape}")

raw_scans: (2067, 9)


In [177]:
# Add a diagnosis column to raw_scans
dxf = op.join(PATHS["metadata"], "ssheets", "LEADS_Internal_PET-Screening.xlsx")
if op.isfile(dxf):
    dx = pd.read_excel(dxf)
    dx_map = {"ID": "subj", "Cohort": "dx"}
    dx = dx.rename(columns=dx_map)[["subj", "dx"]]
    raw_scans = dx.merge(raw_scans, on="subj", how="right")

# Add controls.
subj_regf = op.join(
    PATHS["metadata"], "ssheets", "Participant Registration_vertical.csv"
)
if op.isfile(subj_regf):
    subj_reg = pd.read_csv(subj_regf)
    subj_reg_map = {"subject.label": "subj", "dd_revision_field.translated_value": "dx"}
    subj_reg = subj_reg.rename(columns=subj_reg_map)[["subj", "dx"]]
cn_subjs = subj_reg.query("(dx=='Cognitively Normal Participant')")["subj"].tolist()
raw_scans.loc[pd.isna(raw_scans["dx"]), "dx"] = raw_scans.loc[
    pd.isna(raw_scans["dx"]), "subj"
].apply(lambda x: "CN" if np.isin(x, cn_subjs) else np.nan)

In [178]:
# Find PET scans that are ready and needing to be processed
proc_pet_dirs = []
need_to_process = []
for idx, scan in raw_scans.iterrows():
    if scan["mri_dir"] is np.nan:
        proc_pet_dirs.append(np.nan)
        need_to_process.append(False)
        continue

    proc_pet_dir = op.join(
        PATHS["processed"], scan["subj"], f"{scan['scan_type']}_{scan['scan_date']}"
    )
    proc_pet_dirs.append(proc_pet_dir)
    need_to_process.append(not op.exists(proc_pet_dir))

raw_scans["proc_pet_dir"] = proc_pet_dirs
raw_scans["need_to_process"] = need_to_process

In [188]:
# Save raw_scans to a CSV file
outf = op.join(PATHS["metadata"], "log", f"raw_pet_scans_{now()}.csv")
raw_scans.to_csv(outf, index=False)
print(f"Saved raw_scans to {outf}")

Saved raw_scans to /mnt/coredata/processing/leads/metadata/log/raw_pet_scans_2024-03-27-22-54-15.csv


# Do other stuff

In [15]:
scan

subj                                                      LDS0070120
dx                                                                CN
scan_date                                                 2019-06-19
scan_type                                                        FBB
visit                                                              1
raw_petf           /mnt/coredata/processing/leads/data/raw/fbb/LD...
mri_date                                                  2019-06-20
days_to_mri                                                        1
fs_dir             /mnt/coredata/processing/leads/data/freesurfer...
mri_dir            /mnt/coredata/processing/leads/data/processed/...
proc_pet_dir       /mnt/coredata/processing/leads/data/processed/...
need_to_process                                                 True
Name: 0, dtype: object

In [21]:
scan

subj                                                      LDS0070313
dx                                                              EOAD
scan_date                                                 2022-10-12
scan_type                                                        FBB
visit                                                              2
raw_petf           /mnt/coredata/processing/leads/data/raw/fbb/LD...
mri_date                                                  2022-10-11
days_to_mri                                                        1
fs_dir             /mnt/coredata/processing/leads/data/freesurfer...
mri_dir            /mnt/coredata/processing/leads/data/processed/...
proc_pet_dir       /mnt/coredata/processing/leads/data/processed/...
need_to_process                                                 True
Name: 47, dtype: object

In [35]:
pet_dir_old, mri_dir_old

('/mnt/coredata/Projects/LEADS/data_f7p1/processed/LDS1770560/Timepoint2/FTP_2024-03-05',
 '/mnt/coredata/Projects/LEADS/data_f7p1/processed/LDS1770560/Timepoint2/MRI_T1_2024-01-29')

In [33]:
file_renaming_map = {
    "FBB": {
        op.join(pet_dir_old, "compwm_ref_mask.nii"): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
    },
    "FTP": {
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
        op.join(pet_dir_old, ""): op.join(mri_dir, ""),
    },
    "MRI-T1": {

    },
}



    "<SUBJ>_MRI_T1_<YYYY-MM-DD>_raparc+aseg_bs.nii",
    "<SUBJ>_MRI_T1_<YYYY-MM-DD>_raparc+aseg_wm.nii",
    "s<SUBJ>_MRI_T1_<YYYY-MM-DD>_raparc+aseg_wm.nii",
    "s<SUBJ>_MRI_T1_<YYYY-MM-DD>_raparc+aseg_wm_thr0p7.nii",
    "wholecbl_ref_mask.nii",

    "infcblg_ref_mask.nii",
    "rwrCerebellum-SUIT.nii",

    "iy_<SUBJ>_MRI_T1_<YYYY-MM-DD>_nu.nii",
    "<SUBJ>_MRI_T1_<YYYY-MM-DD>_nu.nii",
    "<SUBJ>_MRI_T1_<YYYY-MM-DD>_raparc+aseg.nii",
    "<SUBJ>_MRI-T1_<YYYY-MM-DD>_rbrainstem-sublabels.nii",


# copy mask files from PET to MRI proc dirs
for idx, scan in raw_scans.query("(scan_type==['FBB','FTP'])")sort_values("scan_date").iterrows():
    try:
        globstr = f"/mnt/coredata/Projects/LEADS/data_f7p1/processed/{scan['subj']}/Timepoint*/{scan['scan_type']}_{scan['scan_date']}"
        pet_dir_old = glob(globstr)[0]
    except IndexError:
        print(f"WARNING: {scan['subj']} {scan['scan_type']} scan on {scan['scan_date']} is missing a processed PET dir in {globstr}")
        continue
    try:
        globstr = f"/mnt/coredata/Projects/LEADS/data_f7p1/processed/{scan['subj']}/Timepoint*/MRI_T1_{scan['mri_date']}"
        mri_dir_old = glob(globstr)[0]
    except IndexError:
        print(f"WARNING: {scan['subj']} MRI scan on {scan['scan_date']} is missing a processed MRI dir in {globstr}")
        continue

    # copy mask files from PET to MRI proc dirs
    mri_dir = scan["mri_dir"]







In [13]:
raw_scans.query("(subj=='LDS0370008')")

Unnamed: 0,subj,dx,scan_date,scan_type,visit,raw_petf,mri_date,days_to_mri,fs_dir,mri_dir,proc_pet_dir,need_to_process
680,LDS0370008,EOAD,2018-08-15,FBB,1,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2018-08-15,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
681,LDS0370008,EOAD,2019-09-19,FBB,2,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2019-09-19,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
682,LDS0370008,EOAD,2020-10-28,FBB,3,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2020-10-29,1,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
683,LDS0370008,EOAD,2021-11-16,FBB,4,/mnt/coredata/processing/leads/data/raw/fbb/LD...,2021-11-03,13,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
684,LDS0370008,EOAD,2018-08-27,FTP,1,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2018-08-15,12,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
685,LDS0370008,EOAD,2019-10-03,FTP,2,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2019-09-19,14,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
686,LDS0370008,EOAD,2020-10-29,FTP,3,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2020-10-29,0,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True
687,LDS0370008,EOAD,2021-12-09,FTP,4,/mnt/coredata/processing/leads/data/raw/ftp/LD...,2021-11-03,36,/mnt/coredata/processing/leads/data/freesurfer...,/mnt/coredata/processing/leads/data/processed/...,/mnt/coredata/processing/leads/data/processed/...,True


In [157]:
raw_scans.drop_duplicates("subj")["dx"].value_counts(dropna=False)

EOAD       396
EOnonAD    122
CN          99
NaN          1
Name: dx, dtype: int64

raw_scans: (2067, 12)


In [11]:
# Add processed file names to output
processed_filenames = {"MRI_T1": {"raw_cp": op.join(
    PATHS["processed"],
    "{subj}",
    "{scan_type}_{scan_date}",
    "{subj}_{scan_type}_{scan_date}.nii",
)},
"FBB": {"raw_cp": op.join(

'/mnt/coredata/processing/leads/data/processed/LDS0370008/MRI-T1_2020-10-29/LDS0370008_MRI-T1_2020-10-29.nii'

0