In [20]:
import os
import json
import csv
from bs4 import BeautifulSoup

# Directories and file paths
mriqc_dir = "ds004636/derivatives/mriqc"
csv_file = "quality_metrics/BOLD_quality_metrics.csv"
all_included_file = "all_included/all_included.txt"
mriqc_failed_file = "mriqc_failed/mriqc_failed.txt"

# Task names to filter
TASKS = {"ANT", "CCTHot", "WATT3", "stopSignal", "twoByTwo", "DPX", "discountFix", "motorSelectiveStop", "stroop", "surveyMedley"}

In [33]:
# Function to parse filenames
def parse_filename(filename):
    parts = filename.split("_")
    subject = parts[0].replace("sub-", "")
    task = next((t for t in TASKS if t in filename), None)
    return subject, task

def extract_metrics(html_path, metrics):
    with open(html_path, "r", encoding="utf-8") as f:
        soup = BeautifulSoup(f, "html.parser")
    
    data = {}
    other_section = soup.find(id="other")
    if not other_section:
        print(f" other table not found for {html_path}")
        return data  # Return empty if "Other" section is not found
    
    table = other_section.find_next("table", {"id": "iqms-table"})
    if not table:
        print(f"iqms table not found for {html_path}")
        return data  # Return empty if table is not found
    
    for row in table.find_all("tr"):
        cells = row.find_all("td")
        if len(cells) == 2:
            metric_name = cells[0].text.strip()
            value = cells[1].text.strip()
        elif len(cells) == 3:
            # print("idk")
            metric_name = f"{cells[0].text.strip()}_{cells[1].text.strip()}"
            value = cells[2].text.strip()
        else:
            # print("else")
            continue
        
        if metric_name in metrics:
            try:
                data[metric_name] = float(value)
            except ValueError:
                data[metric_name] = value
    
    return data

In [35]:

# Organize data into JSON
quality_data = {"mriqc_failed": {}, "all_included": {}}

# Read metric names from CSV
metrics = set()
with open(csv_file, "r") as f:
    reader = csv.DictReader(f)
    for row in reader:
        metric_name = row['Metric'] #changed this later, didn't check, might have problem!!!
        metrics.add(metric_name)
        
# Process both files
for category, txt_file in [("mriqc_failed", mriqc_failed_file), ("all_included", all_included_file)]:
    with open(txt_file, "r") as f:
        filenames = [line.strip() for line in f]

    for filename in filenames:
        html_path = os.path.join(mriqc_dir, filename)
        if os.path.isfile(html_path):
            subject, task = parse_filename(filename)
            if task:
                if task not in quality_data[category]:
                    quality_data[category][task] = {}
                if subject not in quality_data[category][task]:
                    quality_data[category][task][subject] = {}
                quality_data[category][task][subject] = extract_metrics(html_path, metrics)

# Save to JSON file
with open("quality_metrics/quality_metrics.json", "w") as f:
    json.dump(quality_data, f, indent=4)

print("JSON file created successfully!")

JSON file created successfully!


In [37]:
def reformat_json(input_json, output_json):
    with open(input_json, "r") as f:
        data = json.load(f)
    
    reformatted = {"mriqc_failed": {}, "all_included": {}}
    
    for category in ["mriqc_failed", "all_included"]:
        for task, subjects in data.get(category, {}).items():
            if task not in reformatted[category]:
                reformatted[category][task] = {}
            
            for subject, metrics in subjects.items():
                for metric, value in metrics.items():
                    if metric not in reformatted[category][task]:
                        reformatted[category][task][metric] = {}
                    
                    reformatted[category][task][metric][subject] = value
    
    with open(output_json, "w") as f:
        json.dump(reformatted, f, indent=4)

In [43]:
# File paths
input_json = "quality_metrics/quality_metrics.json"
output_json = "quality_metrics/quality_metrics_reformatted.json"
metrics_csv = "quality_metrics/BOLD_quality_metrics.csv"
output_dir = "quality_metrics/plots"

# Run functions
# reformat_json(input_json, output_json)

In [72]:
import pandas as pd
import matplotlib.pyplot as plt

def plot_task_metric(task, metric, data, thresholds, save_dir="quality_metrics/plots"):
    os.makedirs(save_dir, exist_ok=True)
    
    subjects = []
    values = []
    colors = []
    categories = []
    
    # Extract data for both "mriqc_failed" and "all_included"
    for category in ["all_included","mriqc_failed"]:
        if task in data[category] and metric in data[category][task]:
            for subject, value in data[category][task][metric].items():
                subjects.append(subject)
                values.append(value)
                categories.append(category)  # Keep track of whether it's "mriqc_failed" or "all_included"
    
    # Z-normalize values using Low, High, and Median from thresholds
    if metric in thresholds:
        (low, high, median) = thresholds[metric]
        std_dev = (high - low) / 2  # Approximate standard deviation
        values = [(v - median) / std_dev for v in values]

    # Assign colors (red if outside range, green if inside)
    for value in values:
        if -1 <= value <= 1:  # Within expected range
            colors.append("green")
        else:
            colors.append("red")
    
    # Separate "mriqc_failed" from "all_included" in the plot
    non_failed_count = sum(1 for cat in categories if cat == "all_included")
    
    fig_height = max(6, len(subjects) * 0.3)  # Store the calculated height

    # Increase figure height
    fig, ax = plt.subplots(figsize=(8, fig_height))
    
    ax.scatter(values, range(len(subjects)), c=colors, edgecolors="black", s=60)
    
    # Add a horizontal separation line between failed and included
    if non_failed_count > 0:
        ax.axhline(y=non_failed_count - 0.5, color="black", linestyle="--")

    # Set y-axis labels (subjects)
    ax.set_yticks(range(len(subjects)))
    ax.set_yticklabels(subjects)
    
    # Add plot title and labels
    plt.title(f"{task} - {metric} Plot", fontsize=14, pad=20)
    plt.xlabel("Z-Normalized Value", fontsize=12)
    plt.ylabel("Subjects", fontsize=12)
    
    # Draw shaded gray region for acceptable range (-1 to 1)
    ax.axvspan(-1, 1, color="gray", alpha=0.2)
    
    # Ensure x-axis ticks are visible
    plt.xticks(range(int(min(values)) - 1, int(max(values)) + 2))
    
    # Adjust margins
    plt.subplots_adjust(left=0.2, right=0.8, top=0.8, bottom=0.2)
    
    # Save plot
    plt.savefig(os.path.join(save_dir, f"{task}-{metric}.png"), dpi=300)
    plt.close()

In [66]:
thresholds = dict()
with open(metrics_csv, "r") as f:
    reader = csv.DictReader(f)  # Read CSV as dictionary
    for row in reader:
        metric = row["Metric"]  # Get the metric name
        low, high, median = float(row["Low"]), float(row["High"]), float(row["Median"])  # Convert to floats
        thresholds[metric] = (low, high, median)


with open(output_json, "r") as f:
    data = json.load(f)  # Use json.load() instead of json.loads()
    for task in TASKS:
        for metric in list(thresholds.keys()):
            plot_task_metric(task, metric, data, thresholds, save_dir="quality_metrics/plots")


In [None]:
thresholds = dict()
with open(metrics_csv, "r") as f:
    reader = csv.DictReader(f)  # Read CSV as dictionary
    for row in reader:
        metric = row["Metric"]  # Get the metric name
        low, high, median = float(row["Low"]), float(row["High"]), float(row["Median"])  # Convert to floats
        thresholds[metric] = (low, high, median)


with open(output_json, "r") as f:
    data = json.load(f)  # Use json.load() instead of json.loads()
    plot_task_metric("ANT", "dvars_vstd", data, thresholds, save_dir="quality_metrics/plots")